1 /* 2 * Copyright (c) 2000-2020 Apple Inc. All rights reserved. 3 * 4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ 5 * 6 * This file contains Original Code and/or Modifications of Original Code 7 * as defined in and that are subject to the Apple Public Source License 8 * Version 2.0 (the 'License'). You may not use this file except in 9 * compliance with the License. The rights granted to you under the License 10 * may not be used to create, or enable the creation or redistribution of, 11 * unlawful or unlicensed copies of an Apple operating system, or to 12 * circumvent, violate, or enable the circumvention or violation of, any 13 * terms of an Apple operating system software license agreement. 14 * 15 * Please obtain a copy of the License at 16 * http://www.opensource.apple.com/apsl/ and read it before using this file. 17 * 18 * The Original Code and all software distributed under the License are 19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER 20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, 21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. 23 * Please see the License for the specific language governing rights and 24 * limitations under the License. 25 * 26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ 27 */ 28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */ 29 30 #include <sys/cdefs.h> 31 32 #include <kern/assert.h> 33 #include <kern/ast.h> 34 #include <kern/clock.h> 35 #include <kern/cpu_data.h> 36 #include <kern/kern_types.h> 37 #include <kern/policy_internal.h> 38 #include <kern/processor.h> 39 #include <kern/sched_prim.h> /* for thread_exception_return */ 40 #include <kern/task.h> 41 #include <kern/thread.h> 42 #include <kern/thread_group.h> 43 #include <kern/zalloc.h> 44 #include <kern/work_interval.h> 45 #include <mach/kern_return.h> 46 #include <mach/mach_param.h> 47 #include <mach/mach_port.h> 48 #include <mach/mach_types.h> 49 #include <mach/mach_vm.h> 50 #include <mach/sync_policy.h> 51 #include <mach/task.h> 52 #include <mach/thread_act.h> /* for thread_resume */ 53 #include <mach/thread_policy.h> 54 #include <mach/thread_status.h> 55 #include <mach/vm_prot.h> 56 #include <mach/vm_statistics.h> 57 #include <machine/atomic.h> 58 #include <machine/machine_routines.h> 59 #include <machine/smp.h> 60 #include <vm/vm_map.h> 61 #include <vm/vm_protos.h> 62 63 #include <sys/eventvar.h> 64 #include <sys/kdebug.h> 65 #include <sys/kernel.h> 66 #include <sys/lock.h> 67 #include <sys/param.h> 68 #include <sys/proc_info.h> /* for fill_procworkqueue */ 69 #include <sys/proc_internal.h> 70 #include <sys/pthread_shims.h> 71 #include <sys/resourcevar.h> 72 #include <sys/signalvar.h> 73 #include <sys/sysctl.h> 74 #include <sys/sysproto.h> 75 #include <sys/systm.h> 76 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */ 77 78 #include <pthread/bsdthread_private.h> 79 #include <pthread/workqueue_syscalls.h> 80 #include <pthread/workqueue_internal.h> 81 #include <pthread/workqueue_trace.h> 82 83 #include <os/log.h> 84 85 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2; 86 87 static void workq_bound_thread_unpark_continue(void *uth, wait_result_t wr) __dead2; 88 89 static void workq_bound_thread_initialize_and_unpark_continue(void *uth, wait_result_t wr) __dead2; 90 91 static void workq_bound_thread_setup_and_run(struct uthread *uth, int setup_flags) __dead2; 92 93 static void workq_schedule_creator(proc_t p, struct workqueue *wq, 94 workq_kern_threadreq_flags_t flags); 95 96 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth, 97 workq_threadreq_t req); 98 99 static uint32_t workq_constrained_allowance(struct workqueue *wq, 100 thread_qos_t at_qos, struct uthread *uth, 101 bool may_start_timer, bool record_failed_allowance); 102 103 static bool _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq); 104 105 static bool workq_thread_is_busy(uint64_t cur_ts, 106 _Atomic uint64_t *lastblocked_tsp); 107 108 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS; 109 110 static bool 111 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags); 112 113 static inline void 114 workq_lock_spin(struct workqueue *wq); 115 116 static inline void 117 workq_unlock(struct workqueue *wq); 118 119 #pragma mark globals 120 121 struct workq_usec_var { 122 uint32_t usecs; 123 uint64_t abstime; 124 }; 125 126 #define WORKQ_SYSCTL_USECS(var, init) \ 127 static struct workq_usec_var var = { .usecs = init }; \ 128 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \ 129 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \ 130 workq_sysctl_handle_usecs, "I", "") 131 132 static LCK_GRP_DECLARE(workq_lck_grp, "workq"); 133 os_refgrp_decl(static, workq_refgrp, "workq", NULL); 134 135 static ZONE_DEFINE(workq_zone_workqueue, "workq.wq", 136 sizeof(struct workqueue), ZC_NONE); 137 static ZONE_DEFINE(workq_zone_threadreq, "workq.threadreq", 138 sizeof(struct workq_threadreq_s), ZC_CACHING); 139 140 static struct mpsc_daemon_queue workq_deallocate_queue; 141 142 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS); 143 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS); 144 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS); 145 static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS; 146 static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8; 147 static uint32_t wq_init_constrained_limit = 1; 148 static uint16_t wq_death_max_load; 149 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS]; 150 151 /* 152 * This is not a hard limit but the max size we want to aim to hit across the 153 * entire cooperative pool. We can oversubscribe the pool due to non-cooperative 154 * workers and the max we will oversubscribe the pool by, is a total of 155 * wq_max_cooperative_threads * WORKQ_NUM_QOS_BUCKETS. 156 */ 157 static uint32_t wq_max_cooperative_threads; 158 159 static inline uint32_t 160 wq_cooperative_queue_max_size(struct workqueue *wq) 161 { 162 return wq->wq_cooperative_queue_has_limited_max_size ? 1 : wq_max_cooperative_threads; 163 } 164 165 #pragma mark sysctls 166 167 static int 168 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS 169 { 170 #pragma unused(arg2) 171 struct workq_usec_var *v = arg1; 172 int error = sysctl_handle_int(oidp, &v->usecs, 0, req); 173 if (error || !req->newptr) { 174 return error; 175 } 176 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC, 177 &v->abstime); 178 return 0; 179 } 180 181 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED, 182 &wq_max_threads, 0, ""); 183 184 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED, 185 &wq_max_constrained_threads, 0, ""); 186 187 static int 188 wq_limit_cooperative_threads_for_proc SYSCTL_HANDLER_ARGS 189 { 190 #pragma unused(arg1, arg2, oidp) 191 int input_pool_size = 0; 192 int changed; 193 int error = 0; 194 195 error = sysctl_io_number(req, 0, sizeof(int), &input_pool_size, &changed); 196 if (error || !changed) { 197 return error; 198 } 199 200 #define WQ_COOPERATIVE_POOL_SIZE_DEFAULT 0 201 #define WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS -1 202 /* Not available currently, but sysctl interface is designed to allow these 203 * extra parameters: 204 * WQ_COOPERATIVE_POOL_SIZE_STRICT : -2 (across all bucket) 205 * WQ_COOPERATIVE_POOL_SIZE_CUSTOM : [1, 512] 206 */ 207 208 if (input_pool_size != WQ_COOPERATIVE_POOL_SIZE_DEFAULT 209 && input_pool_size != WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS) { 210 error = EINVAL; 211 goto out; 212 } 213 214 proc_t p = req->p; 215 struct workqueue *wq = proc_get_wqptr(p); 216 217 if (wq != NULL) { 218 workq_lock_spin(wq); 219 if (wq->wq_reqcount > 0 || wq->wq_nthreads > 0) { 220 // Hackily enforce that the workqueue is still new (no requests or 221 // threads) 222 error = ENOTSUP; 223 } else { 224 wq->wq_cooperative_queue_has_limited_max_size = (input_pool_size == WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS); 225 } 226 workq_unlock(wq); 227 } else { 228 /* This process has no workqueue, calling this syctl makes no sense */ 229 return ENOTSUP; 230 } 231 232 out: 233 return error; 234 } 235 236 SYSCTL_PROC(_kern, OID_AUTO, wq_limit_cooperative_threads, 237 CTLFLAG_ANYBODY | CTLFLAG_MASKED | CTLFLAG_WR | CTLFLAG_LOCKED | CTLTYPE_INT, 0, 0, 238 wq_limit_cooperative_threads_for_proc, 239 "I", "Modify the max pool size of the cooperative pool"); 240 241 #pragma mark p_wqptr 242 243 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0) 244 245 static struct workqueue * 246 proc_get_wqptr_fast(struct proc *p) 247 { 248 return os_atomic_load(&p->p_wqptr, relaxed); 249 } 250 251 struct workqueue * 252 proc_get_wqptr(struct proc *p) 253 { 254 struct workqueue *wq = proc_get_wqptr_fast(p); 255 return wq == WQPTR_IS_INITING_VALUE ? NULL : wq; 256 } 257 258 static void 259 proc_set_wqptr(struct proc *p, struct workqueue *wq) 260 { 261 wq = os_atomic_xchg(&p->p_wqptr, wq, release); 262 if (wq == WQPTR_IS_INITING_VALUE) { 263 proc_lock(p); 264 thread_wakeup(&p->p_wqptr); 265 proc_unlock(p); 266 } 267 } 268 269 static bool 270 proc_init_wqptr_or_wait(struct proc *p) 271 { 272 struct workqueue *wq; 273 274 proc_lock(p); 275 wq = os_atomic_load(&p->p_wqptr, relaxed); 276 277 if (wq == NULL) { 278 os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed); 279 proc_unlock(p); 280 return true; 281 } 282 283 if (wq == WQPTR_IS_INITING_VALUE) { 284 assert_wait(&p->p_wqptr, THREAD_UNINT); 285 proc_unlock(p); 286 thread_block(THREAD_CONTINUE_NULL); 287 } else { 288 proc_unlock(p); 289 } 290 return false; 291 } 292 293 static inline event_t 294 workq_parked_wait_event(struct uthread *uth) 295 { 296 return (event_t)&uth->uu_workq_stackaddr; 297 } 298 299 static inline void 300 workq_thread_wakeup(struct uthread *uth) 301 { 302 thread_wakeup_thread(workq_parked_wait_event(uth), get_machthread(uth)); 303 } 304 305 #pragma mark wq_thactive 306 307 #if defined(__LP64__) 308 // Layout is: 309 // 127 - 115 : 13 bits of zeroes 310 // 114 - 112 : best QoS among all pending constrained requests 311 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits 312 #define WQ_THACTIVE_BUCKET_WIDTH 16 313 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH) 314 #else 315 // Layout is: 316 // 63 - 61 : best QoS among all pending constrained requests 317 // 60 : Manager bucket (0 or 1) 318 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits 319 #define WQ_THACTIVE_BUCKET_WIDTH 10 320 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1) 321 #endif 322 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1) 323 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1)) 324 325 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3, 326 "Make sure we have space to encode a QoS"); 327 328 static inline wq_thactive_t 329 _wq_thactive(struct workqueue *wq) 330 { 331 return os_atomic_load_wide(&wq->wq_thactive, relaxed); 332 } 333 334 static inline uint8_t 335 _wq_bucket(thread_qos_t qos) 336 { 337 // Map both BG and MT to the same bucket by over-shifting down and 338 // clamping MT and BG together. 339 switch (qos) { 340 case THREAD_QOS_MAINTENANCE: 341 return 0; 342 default: 343 return qos - 2; 344 } 345 } 346 347 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \ 348 ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT)) 349 350 static inline thread_qos_t 351 _wq_thactive_best_constrained_req_qos(struct workqueue *wq) 352 { 353 // Avoid expensive atomic operations: the three bits we're loading are in 354 // a single byte, and always updated under the workqueue lock 355 wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive; 356 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v); 357 } 358 359 static void 360 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq) 361 { 362 thread_qos_t old_qos, new_qos; 363 workq_threadreq_t req; 364 365 req = priority_queue_max(&wq->wq_constrained_queue, 366 struct workq_threadreq_s, tr_entry); 367 new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED; 368 old_qos = _wq_thactive_best_constrained_req_qos(wq); 369 if (old_qos != new_qos) { 370 long delta = (long)new_qos - (long)old_qos; 371 wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT; 372 /* 373 * We can do an atomic add relative to the initial load because updates 374 * to this qos are always serialized under the workqueue lock. 375 */ 376 v = os_atomic_add(&wq->wq_thactive, v, relaxed); 377 #ifdef __LP64__ 378 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v, 379 (uint64_t)(v >> 64), 0); 380 #else 381 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0); 382 #endif 383 } 384 } 385 386 static inline wq_thactive_t 387 _wq_thactive_offset_for_qos(thread_qos_t qos) 388 { 389 uint8_t bucket = _wq_bucket(qos); 390 __builtin_assume(bucket < WORKQ_NUM_BUCKETS); 391 return (wq_thactive_t)1 << (bucket * WQ_THACTIVE_BUCKET_WIDTH); 392 } 393 394 static inline wq_thactive_t 395 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos) 396 { 397 wq_thactive_t v = _wq_thactive_offset_for_qos(qos); 398 return os_atomic_add_orig(&wq->wq_thactive, v, relaxed); 399 } 400 401 static inline wq_thactive_t 402 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos) 403 { 404 wq_thactive_t v = _wq_thactive_offset_for_qos(qos); 405 return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed); 406 } 407 408 static inline void 409 _wq_thactive_move(struct workqueue *wq, 410 thread_qos_t old_qos, thread_qos_t new_qos) 411 { 412 wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) - 413 _wq_thactive_offset_for_qos(old_qos); 414 os_atomic_add(&wq->wq_thactive, v, relaxed); 415 wq->wq_thscheduled_count[_wq_bucket(old_qos)]--; 416 wq->wq_thscheduled_count[_wq_bucket(new_qos)]++; 417 } 418 419 static inline uint32_t 420 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v, 421 thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount) 422 { 423 uint32_t count = 0, active; 424 uint64_t curtime; 425 426 assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX); 427 428 if (busycount) { 429 curtime = mach_absolute_time(); 430 *busycount = 0; 431 } 432 if (max_busycount) { 433 *max_busycount = THREAD_QOS_LAST - qos; 434 } 435 436 uint8_t i = _wq_bucket(qos); 437 v >>= i * WQ_THACTIVE_BUCKET_WIDTH; 438 for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) { 439 active = v & WQ_THACTIVE_BUCKET_MASK; 440 count += active; 441 442 if (busycount && wq->wq_thscheduled_count[i] > active) { 443 if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) { 444 /* 445 * We only consider the last blocked thread for a given bucket 446 * as busy because we don't want to take the list lock in each 447 * sched callback. However this is an approximation that could 448 * contribute to thread creation storms. 449 */ 450 (*busycount)++; 451 } 452 } 453 } 454 455 return count; 456 } 457 458 /* The input qos here should be the requested QoS of the thread, not accounting 459 * for any overrides */ 460 static inline void 461 _wq_cooperative_queue_scheduled_count_dec(struct workqueue *wq, thread_qos_t qos) 462 { 463 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]--; 464 assert(old_scheduled_count > 0); 465 } 466 467 /* The input qos here should be the requested QoS of the thread, not accounting 468 * for any overrides */ 469 static inline void 470 _wq_cooperative_queue_scheduled_count_inc(struct workqueue *wq, thread_qos_t qos) 471 { 472 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]++; 473 assert(old_scheduled_count < UINT8_MAX); 474 } 475 476 #pragma mark wq_flags 477 478 static inline uint32_t 479 _wq_flags(struct workqueue *wq) 480 { 481 return os_atomic_load(&wq->wq_flags, relaxed); 482 } 483 484 static inline bool 485 _wq_exiting(struct workqueue *wq) 486 { 487 return _wq_flags(wq) & WQ_EXITING; 488 } 489 490 bool 491 workq_is_exiting(struct proc *p) 492 { 493 struct workqueue *wq = proc_get_wqptr(p); 494 return !wq || _wq_exiting(wq); 495 } 496 497 498 #pragma mark workqueue lock 499 500 static bool 501 workq_lock_is_acquired_kdp(struct workqueue *wq) 502 { 503 return kdp_lck_ticket_is_acquired(&wq->wq_lock); 504 } 505 506 static inline void 507 workq_lock_spin(struct workqueue *wq) 508 { 509 lck_ticket_lock(&wq->wq_lock, &workq_lck_grp); 510 } 511 512 static inline void 513 workq_lock_held(struct workqueue *wq) 514 { 515 LCK_TICKET_ASSERT_OWNED(&wq->wq_lock); 516 } 517 518 static inline bool 519 workq_lock_try(struct workqueue *wq) 520 { 521 return lck_ticket_lock_try(&wq->wq_lock, &workq_lck_grp); 522 } 523 524 static inline void 525 workq_unlock(struct workqueue *wq) 526 { 527 lck_ticket_unlock(&wq->wq_lock); 528 } 529 530 #pragma mark idle thread lists 531 532 #define WORKQ_POLICY_INIT(qos) \ 533 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos } 534 535 static inline thread_qos_t 536 workq_pri_bucket(struct uu_workq_policy req) 537 { 538 return MAX(MAX(req.qos_req, req.qos_max), req.qos_override); 539 } 540 541 static inline thread_qos_t 542 workq_pri_override(struct uu_workq_policy req) 543 { 544 return MAX(workq_pri_bucket(req), req.qos_bucket); 545 } 546 547 static inline bool 548 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth) 549 { 550 workq_threadreq_param_t cur_trp, req_trp = { }; 551 552 cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params; 553 if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) { 554 req_trp = kqueue_threadreq_workloop_param(req); 555 } 556 557 /* 558 * CPU percent flags are handled separately to policy changes, so ignore 559 * them for all of these checks. 560 */ 561 uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT); 562 uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT); 563 564 if (!req_flags && !cur_flags) { 565 return false; 566 } 567 568 if (req_flags != cur_flags) { 569 return true; 570 } 571 572 if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) { 573 return true; 574 } 575 576 if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) { 577 return true; 578 } 579 580 return false; 581 } 582 583 static inline bool 584 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth) 585 { 586 if (workq_thread_needs_params_change(req, uth)) { 587 return true; 588 } 589 590 if (req->tr_qos != workq_pri_override(uth->uu_workq_pri)) { 591 return true; 592 } 593 594 #if CONFIG_PREADOPT_TG 595 thread_group_qos_t tg = kqr_preadopt_thread_group(req); 596 if (KQWL_HAS_VALID_PREADOPTED_TG(tg)) { 597 /* 598 * Ideally, we'd add check here to see if thread's preadopt TG is same 599 * as the thread requests's thread group and short circuit if that is 600 * the case. But in the interest of keeping the code clean and not 601 * taking the thread lock here, we're going to skip this. We will 602 * eventually shortcircuit once we try to set the preadoption thread 603 * group on the thread. 604 */ 605 return true; 606 } 607 #endif 608 609 return false; 610 } 611 612 /* Input thread must be self. Called during self override, resetting overrides 613 * or while processing kevents 614 * 615 * Called with workq lock held. Sometimes also the thread mutex 616 */ 617 static void 618 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth, 619 struct uu_workq_policy old_pri, struct uu_workq_policy new_pri, 620 bool force_run) 621 { 622 assert(uth == current_uthread()); 623 624 thread_qos_t old_bucket = old_pri.qos_bucket; 625 thread_qos_t new_bucket = workq_pri_bucket(new_pri); 626 627 if ((old_bucket != new_bucket) && 628 !workq_thread_is_permanently_bound(uth)) { 629 _wq_thactive_move(wq, old_bucket, new_bucket); 630 } 631 632 new_pri.qos_bucket = new_bucket; 633 uth->uu_workq_pri = new_pri; 634 635 if (old_pri.qos_override != new_pri.qos_override) { 636 thread_set_workq_override(get_machthread(uth), new_pri.qos_override); 637 } 638 639 if (wq->wq_reqcount && 640 !workq_thread_is_permanently_bound(uth) && 641 (old_bucket > new_bucket || force_run)) { 642 int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS; 643 if (old_bucket > new_bucket) { 644 /* 645 * When lowering our bucket, we may unblock a thread request, 646 * but we can't drop our priority before we have evaluated 647 * whether this is the case, and if we ever drop the workqueue lock 648 * that would cause a priority inversion. 649 * 650 * We hence have to disallow thread creation in that case. 651 */ 652 flags = 0; 653 } 654 workq_schedule_creator(p, wq, flags); 655 } 656 } 657 658 /* 659 * Sets/resets the cpu percent limits on the current thread. We can't set 660 * these limits from outside of the current thread, so this function needs 661 * to be called when we're executing on the intended 662 */ 663 static void 664 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth) 665 { 666 assert(uth == current_uthread()); 667 workq_threadreq_param_t trp = { }; 668 669 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) { 670 trp = kqueue_threadreq_workloop_param(req); 671 } 672 673 if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) { 674 /* 675 * Going through disable when we have an existing CPU percent limit 676 * set will force the ledger to refill the token bucket of the current 677 * thread. Removing any penalty applied by previous thread use. 678 */ 679 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0); 680 uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT; 681 } 682 683 if (trp.trp_flags & TRP_CPUPERCENT) { 684 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent, 685 (uint64_t)trp.trp_refillms * NSEC_PER_SEC); 686 uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT; 687 } 688 } 689 690 /* 691 * This function is always called with the workq lock, except for the 692 * permanently bound workqueue thread, which instead requires the kqlock. 693 * See locking model for bound thread's uu_workq_flags. 694 */ 695 static void 696 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth, 697 workq_threadreq_t req, bool unpark) 698 { 699 thread_t th = get_machthread(uth); 700 thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP; 701 workq_threadreq_param_t trp = { }; 702 int priority = 31; 703 int policy = POLICY_TIMESHARE; 704 705 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) { 706 trp = kqueue_threadreq_workloop_param(req); 707 } 708 709 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos); 710 uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS; 711 712 if (unpark) { 713 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value; 714 // qos sent out to userspace (may differ from uu_workq_pri on param threads) 715 uth->uu_save.uus_workq_park_data.qos = qos; 716 } 717 718 if (qos == WORKQ_THREAD_QOS_MANAGER) { 719 uint32_t mgr_pri = wq->wq_event_manager_priority; 720 assert(trp.trp_value == 0); // manager qos and thread policy don't mix 721 722 if (_pthread_priority_has_sched_pri(mgr_pri)) { 723 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 724 thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri, 725 POLICY_TIMESHARE); 726 return; 727 } 728 729 qos = _pthread_priority_thread_qos(mgr_pri); 730 } else { 731 if (trp.trp_flags & TRP_PRIORITY) { 732 qos = THREAD_QOS_UNSPECIFIED; 733 priority = trp.trp_pri; 734 uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS; 735 } 736 737 if (trp.trp_flags & TRP_POLICY) { 738 policy = trp.trp_pol; 739 } 740 } 741 742 #if CONFIG_PREADOPT_TG 743 if (req && (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP)) { 744 /* 745 * For kqwl permanently configured with a thread group, we can safely borrow 746 * +1 ref from kqwl_preadopt_tg. A thread then takes additional +1 ref 747 * for itself via thread_set_preadopt_thread_group. 748 * 749 * In all other cases, we cannot safely read and borrow the reference from the kqwl 750 * since it can disappear from under us at any time due to the max-ing logic in 751 * kqueue_set_preadopted_thread_group. 752 * 753 * As such, we do the following dance: 754 * 755 * 1) cmpxchng and steal the kqwl's preadopt thread group and leave 756 * behind with (NULL + QoS). At this point, we have the reference 757 * to the thread group from the kqwl. 758 * 2) Have the thread set the preadoption thread group on itself. 759 * 3) cmpxchng from (NULL + QoS) which we set earlier in (1), back to 760 * thread_group + QoS. ie we try to give the reference back to the kqwl. 761 * If we fail, that's because a higher QoS thread group was set on the 762 * kqwl in kqueue_set_preadopted_thread_group in which case, we need to 763 * go back to (1). 764 */ 765 766 _Atomic(struct thread_group *) * tg_loc = kqr_preadopt_thread_group_addr(req); 767 768 thread_group_qos_t old_tg, new_tg; 769 int ret = 0; 770 again: 771 ret = os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, { 772 if ((!KQWL_HAS_VALID_PREADOPTED_TG(old_tg)) || 773 KQWL_HAS_PERMANENT_PREADOPTED_TG(old_tg)) { 774 os_atomic_rmw_loop_give_up(break); 775 } 776 777 /* 778 * Leave the QoS behind - kqueue_set_preadopted_thread_group will 779 * only modify it if there is a higher QoS thread group to attach 780 */ 781 new_tg = (thread_group_qos_t) ((uintptr_t) old_tg & KQWL_PREADOPT_TG_QOS_MASK); 782 }); 783 784 if (ret) { 785 /* 786 * We successfully took the ref from the kqwl so set it on the 787 * thread now 788 */ 789 thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg)); 790 791 thread_group_qos_t thread_group_to_expect = new_tg; 792 thread_group_qos_t thread_group_to_set = old_tg; 793 794 os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, { 795 if (old_tg != thread_group_to_expect) { 796 /* 797 * There was an intervening write to the kqwl_preadopt_tg, 798 * and it has a higher QoS than what we are working with 799 * here. Abandon our current adopted thread group and redo 800 * the full dance 801 */ 802 thread_group_deallocate_safe(KQWL_GET_PREADOPTED_TG(thread_group_to_set)); 803 os_atomic_rmw_loop_give_up(goto again); 804 } 805 806 new_tg = thread_group_to_set; 807 }); 808 } else { 809 if (KQWL_HAS_PERMANENT_PREADOPTED_TG(old_tg)) { 810 thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg)); 811 } else { 812 /* Nothing valid on the kqwl, just clear what's on the thread */ 813 thread_set_preadopt_thread_group(th, NULL); 814 } 815 } 816 } else { 817 /* Not even a kqwl, clear what's on the thread */ 818 thread_set_preadopt_thread_group(th, NULL); 819 } 820 #endif 821 thread_set_workq_pri(th, qos, priority, policy); 822 } 823 824 /* 825 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held, 826 * every time a servicer is being told about a new max QoS. 827 */ 828 void 829 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr) 830 { 831 struct uu_workq_policy old_pri, new_pri; 832 struct uthread *uth = current_uthread(); 833 struct workqueue *wq = proc_get_wqptr_fast(p); 834 thread_qos_t qos = kqr->tr_kq_qos_index; 835 836 if (uth->uu_workq_pri.qos_max == qos) { 837 return; 838 } 839 840 workq_lock_spin(wq); 841 old_pri = new_pri = uth->uu_workq_pri; 842 new_pri.qos_max = qos; 843 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 844 workq_unlock(wq); 845 } 846 847 #pragma mark idle threads accounting and handling 848 849 static inline struct uthread * 850 workq_oldest_killable_idle_thread(struct workqueue *wq) 851 { 852 struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head); 853 854 if (uth && !uth->uu_save.uus_workq_park_data.has_stack) { 855 uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry); 856 if (uth) { 857 assert(uth->uu_save.uus_workq_park_data.has_stack); 858 } 859 } 860 return uth; 861 } 862 863 static inline uint64_t 864 workq_kill_delay_for_idle_thread(struct workqueue *wq) 865 { 866 uint64_t delay = wq_reduce_pool_window.abstime; 867 uint16_t idle = wq->wq_thidlecount; 868 869 /* 870 * If we have less than wq_death_max_load threads, have a 5s timer. 871 * 872 * For the next wq_max_constrained_threads ones, decay linearly from 873 * from 5s to 50ms. 874 */ 875 if (idle <= wq_death_max_load) { 876 return delay; 877 } 878 879 if (wq_max_constrained_threads > idle - wq_death_max_load) { 880 delay *= (wq_max_constrained_threads - (idle - wq_death_max_load)); 881 } 882 return delay / wq_max_constrained_threads; 883 } 884 885 static inline bool 886 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth, 887 uint64_t now) 888 { 889 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 890 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay; 891 } 892 893 static void 894 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline) 895 { 896 uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed); 897 898 if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) { 899 return; 900 } 901 os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed); 902 903 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0); 904 905 /* 906 * <rdar://problem/13139182> Due to how long term timers work, the leeway 907 * can't be too short, so use 500ms which is long enough that we will not 908 * wake up the CPU for killing threads, but short enough that it doesn't 909 * fall into long-term timer list shenanigans. 910 */ 911 thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline, 912 wq_reduce_pool_window.abstime / 10, 913 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND); 914 } 915 916 /* 917 * `decrement` is set to the number of threads that are no longer dying: 918 * - because they have been resuscitated just in time (workq_pop_idle_thread) 919 * - or have been killed (workq_thread_terminate). 920 */ 921 static void 922 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement) 923 { 924 struct uthread *uth; 925 926 assert(wq->wq_thdying_count >= decrement); 927 if ((wq->wq_thdying_count -= decrement) > 0) { 928 return; 929 } 930 931 if (wq->wq_thidlecount <= 1) { 932 return; 933 } 934 935 if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) { 936 return; 937 } 938 939 uint64_t now = mach_absolute_time(); 940 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 941 942 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) { 943 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START, 944 wq, wq->wq_thidlecount, 0, 0); 945 wq->wq_thdying_count++; 946 uth->uu_workq_flags |= UT_WORKQ_DYING; 947 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) { 948 workq_thread_wakeup(uth); 949 } 950 return; 951 } 952 953 workq_death_call_schedule(wq, 954 uth->uu_save.uus_workq_park_data.idle_stamp + delay); 955 } 956 957 void 958 workq_thread_terminate(struct proc *p, struct uthread *uth) 959 { 960 struct workqueue *wq = proc_get_wqptr_fast(p); 961 962 workq_lock_spin(wq); 963 if (!workq_thread_is_permanently_bound(uth)) { 964 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry); 965 if (uth->uu_workq_flags & UT_WORKQ_DYING) { 966 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END, 967 wq, wq->wq_thidlecount, 0, 0); 968 workq_death_policy_evaluate(wq, 1); 969 } 970 } 971 if (wq->wq_nthreads-- == wq_max_threads) { 972 /* 973 * We got under the thread limit again, which may have prevented 974 * thread creation from happening, redrive if there are pending requests 975 */ 976 if (wq->wq_reqcount) { 977 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 978 } 979 } 980 workq_unlock(wq); 981 982 thread_deallocate(get_machthread(uth)); 983 } 984 985 static void 986 workq_kill_old_threads_call(void *param0, void *param1 __unused) 987 { 988 struct workqueue *wq = param0; 989 990 workq_lock_spin(wq); 991 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0); 992 os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed); 993 workq_death_policy_evaluate(wq, 0); 994 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0); 995 workq_unlock(wq); 996 } 997 998 static struct uthread * 999 workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags, 1000 bool *needs_wakeup) 1001 { 1002 struct uthread *uth; 1003 1004 if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) { 1005 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry); 1006 } else { 1007 uth = TAILQ_FIRST(&wq->wq_thnewlist); 1008 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry); 1009 } 1010 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry); 1011 1012 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0); 1013 uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags; 1014 1015 /* A thread is never woken up as part of the cooperative pool */ 1016 assert((uu_flags & UT_WORKQ_COOPERATIVE) == 0); 1017 1018 if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) { 1019 wq->wq_constrained_threads_scheduled++; 1020 } 1021 wq->wq_threads_scheduled++; 1022 wq->wq_thidlecount--; 1023 1024 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) { 1025 uth->uu_workq_flags ^= UT_WORKQ_DYING; 1026 workq_death_policy_evaluate(wq, 1); 1027 *needs_wakeup = false; 1028 } else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) { 1029 *needs_wakeup = false; 1030 } else { 1031 *needs_wakeup = true; 1032 } 1033 return uth; 1034 } 1035 1036 /* 1037 * Called by thread_create_workq_waiting() during thread initialization, before 1038 * assert_wait, before the thread has been started. 1039 */ 1040 event_t 1041 workq_thread_init_and_wq_lock(task_t task, thread_t th) 1042 { 1043 struct uthread *uth = get_bsdthread_info(th); 1044 1045 uth->uu_workq_flags = UT_WORKQ_NEW; 1046 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY); 1047 uth->uu_workq_thport = MACH_PORT_NULL; 1048 uth->uu_workq_stackaddr = 0; 1049 uth->uu_workq_pthread_kill_allowed = 0; 1050 1051 thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE); 1052 thread_reset_workq_qos(th, THREAD_QOS_LEGACY); 1053 1054 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task))); 1055 return workq_parked_wait_event(uth); 1056 } 1057 1058 /** 1059 * Try to add a new workqueue thread. 1060 * 1061 * - called with workq lock held 1062 * - dropped and retaken around thread creation 1063 * - return with workq lock held 1064 */ 1065 static kern_return_t 1066 workq_add_new_idle_thread( 1067 proc_t p, 1068 struct workqueue *wq, 1069 thread_continue_t continuation, 1070 bool is_permanently_bound, 1071 thread_t *new_thread) 1072 { 1073 mach_vm_offset_t th_stackaddr; 1074 kern_return_t kret; 1075 thread_t th; 1076 1077 wq->wq_nthreads++; 1078 1079 workq_unlock(wq); 1080 1081 vm_map_t vmap = get_task_map(proc_task(p)); 1082 1083 kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr); 1084 if (kret != KERN_SUCCESS) { 1085 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, 1086 kret, 1, 0); 1087 goto out; 1088 } 1089 1090 kret = thread_create_workq_waiting(proc_task(p), 1091 continuation, 1092 &th, 1093 is_permanently_bound); 1094 if (kret != KERN_SUCCESS) { 1095 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, 1096 kret, 0, 0); 1097 pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr); 1098 goto out; 1099 } 1100 1101 // thread_create_workq_waiting() will return with the wq lock held 1102 // on success, because it calls workq_thread_init_and_wq_lock() above 1103 1104 struct uthread *uth = get_bsdthread_info(th); 1105 uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr; 1106 1107 wq->wq_creations++; 1108 if (!is_permanently_bound) { 1109 wq->wq_thidlecount++; 1110 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry); 1111 } 1112 1113 if (new_thread) { 1114 *new_thread = th; 1115 } 1116 1117 WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0); 1118 return kret; 1119 1120 out: 1121 workq_lock_spin(wq); 1122 /* 1123 * Do not redrive here if we went under wq_max_threads again, 1124 * it is the responsibility of the callers of this function 1125 * to do so when it fails. 1126 */ 1127 wq->wq_nthreads--; 1128 return kret; 1129 } 1130 1131 static inline bool 1132 workq_thread_is_overcommit(struct uthread *uth) 1133 { 1134 return (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0; 1135 } 1136 1137 static inline bool 1138 workq_thread_is_nonovercommit(struct uthread *uth) 1139 { 1140 return (uth->uu_workq_flags & (UT_WORKQ_OVERCOMMIT | 1141 UT_WORKQ_COOPERATIVE)) == 0; 1142 } 1143 1144 static inline bool 1145 workq_thread_is_cooperative(struct uthread *uth) 1146 { 1147 return (uth->uu_workq_flags & UT_WORKQ_COOPERATIVE) != 0; 1148 } 1149 1150 bool 1151 workq_thread_is_permanently_bound(struct uthread *uth) 1152 { 1153 return (uth->uu_workq_flags & UT_WORKQ_PERMANENT_BIND) != 0; 1154 } 1155 1156 static inline void 1157 workq_thread_set_type(struct uthread *uth, uint16_t flags) 1158 { 1159 uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE); 1160 uth->uu_workq_flags |= flags; 1161 } 1162 1163 1164 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1 1165 1166 __attribute__((noreturn, noinline)) 1167 static void 1168 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq, 1169 struct uthread *uth, uint32_t death_flags, uint32_t setup_flags) 1170 { 1171 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri); 1172 bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW; 1173 1174 if (qos > WORKQ_THREAD_QOS_CLEANUP) { 1175 workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true); 1176 qos = WORKQ_THREAD_QOS_CLEANUP; 1177 } 1178 1179 workq_thread_reset_cpupercent(NULL, uth); 1180 1181 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) { 1182 wq->wq_thidlecount--; 1183 if (first_use) { 1184 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry); 1185 } else { 1186 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry); 1187 } 1188 } 1189 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry); 1190 1191 workq_unlock(wq); 1192 1193 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) { 1194 __assert_only kern_return_t kr; 1195 kr = thread_set_voucher_name(MACH_PORT_NULL); 1196 assert(kr == KERN_SUCCESS); 1197 } 1198 1199 uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS; 1200 thread_t th = get_machthread(uth); 1201 vm_map_t vmap = get_task_map(proc_task(p)); 1202 1203 if (!first_use) { 1204 flags |= WQ_FLAG_THREAD_REUSE; 1205 } 1206 1207 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr, 1208 uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags); 1209 __builtin_unreachable(); 1210 } 1211 1212 bool 1213 workq_is_current_thread_updating_turnstile(struct workqueue *wq) 1214 { 1215 return wq->wq_turnstile_updater == current_thread(); 1216 } 1217 1218 __attribute__((always_inline)) 1219 static inline void 1220 workq_perform_turnstile_operation_locked(struct workqueue *wq, 1221 void (^operation)(void)) 1222 { 1223 workq_lock_held(wq); 1224 wq->wq_turnstile_updater = current_thread(); 1225 operation(); 1226 wq->wq_turnstile_updater = THREAD_NULL; 1227 } 1228 1229 static void 1230 workq_turnstile_update_inheritor(struct workqueue *wq, 1231 turnstile_inheritor_t inheritor, 1232 turnstile_update_flags_t flags) 1233 { 1234 if (wq->wq_inheritor == inheritor) { 1235 return; 1236 } 1237 wq->wq_inheritor = inheritor; 1238 workq_perform_turnstile_operation_locked(wq, ^{ 1239 turnstile_update_inheritor(wq->wq_turnstile, inheritor, 1240 flags | TURNSTILE_IMMEDIATE_UPDATE); 1241 turnstile_update_inheritor_complete(wq->wq_turnstile, 1242 TURNSTILE_INTERLOCK_HELD); 1243 }); 1244 } 1245 1246 static void 1247 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth, 1248 uint32_t setup_flags) 1249 { 1250 uint64_t now = mach_absolute_time(); 1251 bool is_creator = (uth == wq->wq_creator); 1252 1253 if (workq_thread_is_cooperative(uth)) { 1254 assert(!is_creator); 1255 1256 thread_qos_t thread_qos = uth->uu_workq_pri.qos_req; 1257 _wq_cooperative_queue_scheduled_count_dec(wq, thread_qos); 1258 1259 /* Before we get here, we always go through 1260 * workq_select_threadreq_or_park_and_unlock. If we got here, it means 1261 * that we went through the logic in workq_threadreq_select which 1262 * did the refresh for the next best cooperative qos while 1263 * excluding the current thread - we shouldn't need to do it again. 1264 */ 1265 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false); 1266 } else if (workq_thread_is_nonovercommit(uth)) { 1267 assert(!is_creator); 1268 1269 wq->wq_constrained_threads_scheduled--; 1270 } 1271 1272 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE); 1273 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry); 1274 wq->wq_threads_scheduled--; 1275 1276 if (is_creator) { 1277 wq->wq_creator = NULL; 1278 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0, 1279 uth->uu_save.uus_workq_park_data.yields); 1280 } 1281 1282 if (wq->wq_inheritor == get_machthread(uth)) { 1283 assert(wq->wq_creator == NULL); 1284 if (wq->wq_reqcount) { 1285 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ); 1286 } else { 1287 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 1288 } 1289 } 1290 1291 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 1292 assert(is_creator || (_wq_flags(wq) & WQ_EXITING)); 1293 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry); 1294 wq->wq_thidlecount++; 1295 return; 1296 } 1297 1298 if (!is_creator) { 1299 _wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket); 1300 wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--; 1301 uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP; 1302 } 1303 1304 uth->uu_save.uus_workq_park_data.idle_stamp = now; 1305 1306 struct uthread *oldest = workq_oldest_killable_idle_thread(wq); 1307 uint16_t cur_idle = wq->wq_thidlecount; 1308 1309 if (cur_idle >= wq_max_constrained_threads || 1310 (wq->wq_thdying_count == 0 && oldest && 1311 workq_should_kill_idle_thread(wq, oldest, now))) { 1312 /* 1313 * Immediately kill threads if we have too may of them. 1314 * 1315 * And swap "place" with the oldest one we'd have woken up. 1316 * This is a relatively desperate situation where we really 1317 * need to kill threads quickly and it's best to kill 1318 * the one that's currently on core than context switching. 1319 */ 1320 if (oldest) { 1321 oldest->uu_save.uus_workq_park_data.idle_stamp = now; 1322 TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry); 1323 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry); 1324 } 1325 1326 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START, 1327 wq, cur_idle, 0, 0); 1328 wq->wq_thdying_count++; 1329 uth->uu_workq_flags |= UT_WORKQ_DYING; 1330 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP; 1331 workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags); 1332 __builtin_unreachable(); 1333 } 1334 1335 struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head); 1336 1337 cur_idle += 1; 1338 wq->wq_thidlecount = cur_idle; 1339 1340 if (cur_idle >= wq_death_max_load && tail && 1341 tail->uu_save.uus_workq_park_data.has_stack) { 1342 uth->uu_save.uus_workq_park_data.has_stack = false; 1343 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry); 1344 } else { 1345 uth->uu_save.uus_workq_park_data.has_stack = true; 1346 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry); 1347 } 1348 1349 if (!tail) { 1350 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 1351 workq_death_call_schedule(wq, now + delay); 1352 } 1353 } 1354 1355 #pragma mark thread requests 1356 1357 static inline bool 1358 workq_tr_is_overcommit(workq_tr_flags_t tr_flags) 1359 { 1360 return (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) != 0; 1361 } 1362 1363 static inline bool 1364 workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags) 1365 { 1366 return (tr_flags & (WORKQ_TR_FLAG_OVERCOMMIT | 1367 WORKQ_TR_FLAG_COOPERATIVE | 1368 WORKQ_TR_FLAG_PERMANENT_BIND)) == 0; 1369 } 1370 1371 static inline bool 1372 workq_tr_is_cooperative(workq_tr_flags_t tr_flags) 1373 { 1374 return (tr_flags & WORKQ_TR_FLAG_COOPERATIVE) != 0; 1375 } 1376 1377 #define workq_threadreq_is_overcommit(req) workq_tr_is_overcommit((req)->tr_flags) 1378 #define workq_threadreq_is_nonovercommit(req) workq_tr_is_nonovercommit((req)->tr_flags) 1379 #define workq_threadreq_is_cooperative(req) workq_tr_is_cooperative((req)->tr_flags) 1380 1381 static inline int 1382 workq_priority_for_req(workq_threadreq_t req) 1383 { 1384 thread_qos_t qos = req->tr_qos; 1385 1386 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 1387 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req); 1388 assert(trp.trp_flags & TRP_PRIORITY); 1389 return trp.trp_pri; 1390 } 1391 return thread_workq_pri_for_qos(qos); 1392 } 1393 1394 static inline struct priority_queue_sched_max * 1395 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req) 1396 { 1397 assert(!workq_tr_is_cooperative(req->tr_flags)); 1398 1399 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 1400 return &wq->wq_special_queue; 1401 } else if (workq_tr_is_overcommit(req->tr_flags)) { 1402 return &wq->wq_overcommit_queue; 1403 } else { 1404 return &wq->wq_constrained_queue; 1405 } 1406 } 1407 1408 /* Calculates the number of threads scheduled >= the input QoS */ 1409 static uint64_t 1410 workq_num_cooperative_threads_scheduled_to_qos_internal(struct workqueue *wq, thread_qos_t qos) 1411 { 1412 uint64_t num_cooperative_threads = 0; 1413 1414 for (thread_qos_t cur_qos = WORKQ_THREAD_QOS_MAX; cur_qos >= qos; cur_qos--) { 1415 uint8_t bucket = _wq_bucket(cur_qos); 1416 num_cooperative_threads += wq->wq_cooperative_queue_scheduled_count[bucket]; 1417 } 1418 1419 return num_cooperative_threads; 1420 } 1421 1422 /* Calculates the number of threads scheduled >= the input QoS */ 1423 static uint64_t 1424 workq_num_cooperative_threads_scheduled_to_qos_locked(struct workqueue *wq, thread_qos_t qos) 1425 { 1426 workq_lock_held(wq); 1427 return workq_num_cooperative_threads_scheduled_to_qos_internal(wq, qos); 1428 } 1429 1430 static uint64_t 1431 workq_num_cooperative_threads_scheduled_total(struct workqueue *wq) 1432 { 1433 return workq_num_cooperative_threads_scheduled_to_qos_locked(wq, WORKQ_THREAD_QOS_MIN); 1434 } 1435 1436 static bool 1437 workq_has_cooperative_thread_requests(struct workqueue *wq) 1438 { 1439 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) { 1440 uint8_t bucket = _wq_bucket(qos); 1441 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) { 1442 return true; 1443 } 1444 } 1445 1446 return false; 1447 } 1448 1449 /* 1450 * Determines the next QoS bucket we should service next in the cooperative 1451 * pool. This function will always return a QoS for cooperative pool as long as 1452 * there are requests to be serviced. 1453 * 1454 * Unlike the other thread pools, for the cooperative thread pool the schedule 1455 * counts for the various buckets in the pool affect the next best request for 1456 * it. 1457 * 1458 * This function is called in the following contexts: 1459 * 1460 * a) When determining the best thread QoS for cooperative bucket for the 1461 * creator/thread reuse 1462 * 1463 * b) Once (a) has happened and thread has bound to a thread request, figuring 1464 * out whether the next best request for this pool has changed so that creator 1465 * can be scheduled. 1466 * 1467 * Returns true if the cooperative queue's best qos changed from previous 1468 * value. 1469 */ 1470 static bool 1471 _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq) 1472 { 1473 workq_lock_held(wq); 1474 1475 thread_qos_t old_best_req_qos = wq->wq_cooperative_queue_best_req_qos; 1476 1477 /* We determine the next best cooperative thread request based on the 1478 * following: 1479 * 1480 * 1. Take the MAX of the following: 1481 * a) Highest qos with pending TRs such that number of scheduled 1482 * threads so far with >= qos is < wq_max_cooperative_threads 1483 * b) Highest qos bucket with pending TRs but no scheduled threads for that bucket 1484 * 1485 * 2. If the result of (1) is UN, then we pick the highest priority amongst 1486 * pending thread requests in the pool. 1487 * 1488 */ 1489 thread_qos_t highest_qos_with_no_scheduled = THREAD_QOS_UNSPECIFIED; 1490 thread_qos_t highest_qos_req_with_width = THREAD_QOS_UNSPECIFIED; 1491 1492 thread_qos_t highest_qos_req = THREAD_QOS_UNSPECIFIED; 1493 1494 int scheduled_count_till_qos = 0; 1495 1496 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) { 1497 uint8_t bucket = _wq_bucket(qos); 1498 uint8_t scheduled_count_for_bucket = wq->wq_cooperative_queue_scheduled_count[bucket]; 1499 scheduled_count_till_qos += scheduled_count_for_bucket; 1500 1501 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) { 1502 if (qos > highest_qos_req) { 1503 highest_qos_req = qos; 1504 } 1505 /* 1506 * The pool isn't saturated for threads at and above this QoS, and 1507 * this qos bucket has pending requests 1508 */ 1509 if (scheduled_count_till_qos < wq_cooperative_queue_max_size(wq)) { 1510 if (qos > highest_qos_req_with_width) { 1511 highest_qos_req_with_width = qos; 1512 } 1513 } 1514 1515 /* 1516 * There are no threads scheduled for this bucket but there 1517 * is work pending, give it at least 1 thread 1518 */ 1519 if (scheduled_count_for_bucket == 0) { 1520 if (qos > highest_qos_with_no_scheduled) { 1521 highest_qos_with_no_scheduled = qos; 1522 } 1523 } 1524 } 1525 } 1526 1527 wq->wq_cooperative_queue_best_req_qos = MAX(highest_qos_with_no_scheduled, highest_qos_req_with_width); 1528 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) { 1529 wq->wq_cooperative_queue_best_req_qos = highest_qos_req; 1530 } 1531 1532 #if MACH_ASSERT 1533 /* Assert that if we are showing up the next best req as UN, then there 1534 * actually is no thread request in the cooperative pool buckets */ 1535 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) { 1536 assert(!workq_has_cooperative_thread_requests(wq)); 1537 } 1538 #endif 1539 1540 return old_best_req_qos != wq->wq_cooperative_queue_best_req_qos; 1541 } 1542 1543 /* 1544 * Returns whether or not the input thread (or creator thread if uth is NULL) 1545 * should be allowed to work as part of the cooperative pool for the <input qos> 1546 * bucket. 1547 * 1548 * This function is called in a bunch of places: 1549 * a) Quantum expires for a thread and it is part of the cooperative pool 1550 * b) When trying to pick a thread request for the creator thread to 1551 * represent. 1552 * c) When a thread is trying to pick a thread request to actually bind to 1553 * and service. 1554 * 1555 * Called with workq lock held. 1556 */ 1557 1558 #define WQ_COOPERATIVE_POOL_UNSATURATED 1 1559 #define WQ_COOPERATIVE_BUCKET_UNSERVICED 2 1560 #define WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS 3 1561 1562 static bool 1563 workq_cooperative_allowance(struct workqueue *wq, thread_qos_t qos, struct uthread *uth, 1564 bool may_start_timer) 1565 { 1566 workq_lock_held(wq); 1567 1568 bool exclude_thread_as_scheduled = false; 1569 bool passed_admissions = false; 1570 uint8_t bucket = _wq_bucket(qos); 1571 1572 if (uth && workq_thread_is_cooperative(uth)) { 1573 exclude_thread_as_scheduled = true; 1574 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req); 1575 } 1576 1577 /* 1578 * We have not saturated the pool yet, let this thread continue 1579 */ 1580 uint64_t total_cooperative_threads; 1581 total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq); 1582 if (total_cooperative_threads < wq_cooperative_queue_max_size(wq)) { 1583 passed_admissions = true; 1584 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, 1585 total_cooperative_threads, qos, passed_admissions, 1586 WQ_COOPERATIVE_POOL_UNSATURATED); 1587 goto out; 1588 } 1589 1590 /* 1591 * Without this thread, nothing is servicing the bucket which has pending 1592 * work 1593 */ 1594 uint64_t bucket_scheduled = wq->wq_cooperative_queue_scheduled_count[bucket]; 1595 if (bucket_scheduled == 0 && 1596 !STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) { 1597 passed_admissions = true; 1598 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, 1599 total_cooperative_threads, qos, passed_admissions, 1600 WQ_COOPERATIVE_BUCKET_UNSERVICED); 1601 goto out; 1602 } 1603 1604 /* 1605 * If number of threads at the QoS bucket >= input QoS exceeds the max we want 1606 * for the pool, deny this thread 1607 */ 1608 uint64_t aggregate_down_to_qos = workq_num_cooperative_threads_scheduled_to_qos_locked(wq, qos); 1609 passed_admissions = (aggregate_down_to_qos < wq_cooperative_queue_max_size(wq)); 1610 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, aggregate_down_to_qos, 1611 qos, passed_admissions, WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS); 1612 1613 if (!passed_admissions && may_start_timer) { 1614 workq_schedule_delayed_thread_creation(wq, 0); 1615 } 1616 1617 out: 1618 if (exclude_thread_as_scheduled) { 1619 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req); 1620 } 1621 return passed_admissions; 1622 } 1623 1624 /* 1625 * returns true if the best request for the pool changed as a result of 1626 * enqueuing this thread request. 1627 */ 1628 static bool 1629 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req) 1630 { 1631 assert(req->tr_state == WORKQ_TR_STATE_NEW); 1632 1633 req->tr_state = WORKQ_TR_STATE_QUEUED; 1634 wq->wq_reqcount += req->tr_count; 1635 1636 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 1637 assert(wq->wq_event_manager_threadreq == NULL); 1638 assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT); 1639 assert(req->tr_count == 1); 1640 wq->wq_event_manager_threadreq = req; 1641 return true; 1642 } 1643 1644 if (workq_threadreq_is_cooperative(req)) { 1645 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 1646 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI); 1647 1648 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)]; 1649 STAILQ_INSERT_TAIL(bucket, req, tr_link); 1650 1651 return _wq_cooperative_queue_refresh_best_req_qos(wq); 1652 } 1653 1654 struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req); 1655 1656 priority_queue_entry_set_sched_pri(q, &req->tr_entry, 1657 workq_priority_for_req(req), false); 1658 1659 if (priority_queue_insert(q, &req->tr_entry)) { 1660 if (workq_threadreq_is_nonovercommit(req)) { 1661 _wq_thactive_refresh_best_constrained_req_qos(wq); 1662 } 1663 return true; 1664 } 1665 return false; 1666 } 1667 1668 /* 1669 * returns true if one of the following is true (so as to update creator if 1670 * needed): 1671 * 1672 * (a) the next highest request of the pool we dequeued the request from changed 1673 * (b) the next highest requests of the pool the current thread used to be a 1674 * part of, changed 1675 * 1676 * For overcommit, special and constrained pools, the next highest QoS for each 1677 * pool just a MAX of pending requests so tracking (a) is sufficient. 1678 * 1679 * But for cooperative thread pool, the next highest QoS for the pool depends on 1680 * schedule counts in the pool as well. So if the current thread used to be 1681 * cooperative in it's previous logical run ie (b), then that can also affect 1682 * cooperative pool's next best QoS requests. 1683 */ 1684 static bool 1685 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req, 1686 bool cooperative_sched_count_changed) 1687 { 1688 wq->wq_reqcount--; 1689 1690 bool next_highest_request_changed = false; 1691 1692 if (--req->tr_count == 0) { 1693 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 1694 assert(wq->wq_event_manager_threadreq == req); 1695 assert(req->tr_count == 0); 1696 wq->wq_event_manager_threadreq = NULL; 1697 1698 /* If a cooperative thread was the one which picked up the manager 1699 * thread request, we need to reevaluate the cooperative pool 1700 * anyways. 1701 */ 1702 if (cooperative_sched_count_changed) { 1703 _wq_cooperative_queue_refresh_best_req_qos(wq); 1704 } 1705 return true; 1706 } 1707 1708 if (workq_threadreq_is_cooperative(req)) { 1709 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 1710 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI); 1711 /* Account for the fact that BG and MT are coalesced when 1712 * calculating best request for cooperative pool 1713 */ 1714 assert(_wq_bucket(req->tr_qos) == _wq_bucket(wq->wq_cooperative_queue_best_req_qos)); 1715 1716 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)]; 1717 __assert_only workq_threadreq_t head = STAILQ_FIRST(bucket); 1718 1719 assert(head == req); 1720 STAILQ_REMOVE_HEAD(bucket, tr_link); 1721 1722 /* 1723 * If the request we're dequeueing is cooperative, then the sched 1724 * counts definitely changed. 1725 */ 1726 assert(cooperative_sched_count_changed); 1727 } 1728 1729 /* 1730 * We want to do the cooperative pool refresh after dequeueing a 1731 * cooperative thread request if any (to combine both effects into 1 1732 * refresh operation) 1733 */ 1734 if (cooperative_sched_count_changed) { 1735 next_highest_request_changed = _wq_cooperative_queue_refresh_best_req_qos(wq); 1736 } 1737 1738 if (!workq_threadreq_is_cooperative(req)) { 1739 /* 1740 * All other types of requests are enqueued in priority queues 1741 */ 1742 1743 if (priority_queue_remove(workq_priority_queue_for_req(wq, req), 1744 &req->tr_entry)) { 1745 next_highest_request_changed |= true; 1746 if (workq_threadreq_is_nonovercommit(req)) { 1747 _wq_thactive_refresh_best_constrained_req_qos(wq); 1748 } 1749 } 1750 } 1751 } 1752 1753 return next_highest_request_changed; 1754 } 1755 1756 static void 1757 workq_threadreq_destroy(proc_t p, workq_threadreq_t req) 1758 { 1759 req->tr_state = WORKQ_TR_STATE_CANCELED; 1760 if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) { 1761 kqueue_threadreq_cancel(p, req); 1762 } else { 1763 zfree(workq_zone_threadreq, req); 1764 } 1765 } 1766 1767 #pragma mark workqueue thread creation thread calls 1768 1769 static inline bool 1770 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend, 1771 uint32_t fail_mask) 1772 { 1773 uint32_t old_flags, new_flags; 1774 1775 os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, { 1776 if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) { 1777 os_atomic_rmw_loop_give_up(return false); 1778 } 1779 if (__improbable(old_flags & WQ_PROC_SUSPENDED)) { 1780 new_flags = old_flags | pend; 1781 } else { 1782 new_flags = old_flags | sched; 1783 } 1784 }); 1785 1786 return (old_flags & WQ_PROC_SUSPENDED) == 0; 1787 } 1788 1789 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1 1790 1791 static bool 1792 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags) 1793 { 1794 assert(!preemption_enabled()); 1795 1796 if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED, 1797 WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED | 1798 WQ_IMMEDIATE_CALL_SCHEDULED)) { 1799 return false; 1800 } 1801 1802 uint64_t now = mach_absolute_time(); 1803 1804 if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) { 1805 /* do not change the window */ 1806 } else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) { 1807 wq->wq_timer_interval *= 2; 1808 if (wq->wq_timer_interval > wq_max_timer_interval.abstime) { 1809 wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime; 1810 } 1811 } else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) { 1812 wq->wq_timer_interval /= 2; 1813 if (wq->wq_timer_interval < wq_stalled_window.abstime) { 1814 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime; 1815 } 1816 } 1817 1818 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, 1819 _wq_flags(wq), wq->wq_timer_interval); 1820 1821 thread_call_t call = wq->wq_delayed_call; 1822 uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED; 1823 uint64_t deadline = now + wq->wq_timer_interval; 1824 if (thread_call_enter1_delayed(call, (void *)arg, deadline)) { 1825 panic("delayed_call was already enqueued"); 1826 } 1827 return true; 1828 } 1829 1830 static void 1831 workq_schedule_immediate_thread_creation(struct workqueue *wq) 1832 { 1833 assert(!preemption_enabled()); 1834 1835 if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED, 1836 WQ_IMMEDIATE_CALL_PENDED, 0)) { 1837 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, 1838 _wq_flags(wq), 0); 1839 1840 uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED; 1841 if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) { 1842 panic("immediate_call was already enqueued"); 1843 } 1844 } 1845 } 1846 1847 void 1848 workq_proc_suspended(struct proc *p) 1849 { 1850 struct workqueue *wq = proc_get_wqptr(p); 1851 1852 if (wq) { 1853 os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed); 1854 } 1855 } 1856 1857 void 1858 workq_proc_resumed(struct proc *p) 1859 { 1860 struct workqueue *wq = proc_get_wqptr(p); 1861 uint32_t wq_flags; 1862 1863 if (!wq) { 1864 return; 1865 } 1866 1867 wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED | 1868 WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed); 1869 if ((wq_flags & WQ_EXITING) == 0) { 1870 disable_preemption(); 1871 if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) { 1872 workq_schedule_immediate_thread_creation(wq); 1873 } else if (wq_flags & WQ_DELAYED_CALL_PENDED) { 1874 workq_schedule_delayed_thread_creation(wq, 1875 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART); 1876 } 1877 enable_preemption(); 1878 } 1879 } 1880 1881 /** 1882 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now 1883 */ 1884 static bool 1885 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp) 1886 { 1887 uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed); 1888 if (now <= lastblocked_ts) { 1889 /* 1890 * Because the update of the timestamp when a thread blocks 1891 * isn't serialized against us looking at it (i.e. we don't hold 1892 * the workq lock), it's possible to have a timestamp that matches 1893 * the current time or that even looks to be in the future relative 1894 * to when we grabbed the current time... 1895 * 1896 * Just treat this as a busy thread since it must have just blocked. 1897 */ 1898 return true; 1899 } 1900 return (now - lastblocked_ts) < wq_stalled_window.abstime; 1901 } 1902 1903 static void 1904 workq_add_new_threads_call(void *_p, void *flags) 1905 { 1906 proc_t p = _p; 1907 struct workqueue *wq = proc_get_wqptr(p); 1908 uint32_t my_flag = (uint32_t)(uintptr_t)flags; 1909 1910 /* 1911 * workq_exit() will set the workqueue to NULL before 1912 * it cancels thread calls. 1913 */ 1914 if (!wq) { 1915 return; 1916 } 1917 1918 assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) || 1919 (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED)); 1920 1921 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq), 1922 wq->wq_nthreads, wq->wq_thidlecount); 1923 1924 workq_lock_spin(wq); 1925 1926 wq->wq_thread_call_last_run = mach_absolute_time(); 1927 os_atomic_andnot(&wq->wq_flags, my_flag, release); 1928 1929 /* This can drop the workqueue lock, and take it again */ 1930 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 1931 1932 workq_unlock(wq); 1933 1934 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0, 1935 wq->wq_nthreads, wq->wq_thidlecount); 1936 } 1937 1938 #pragma mark thread state tracking 1939 1940 static void 1941 workq_sched_callback(int type, thread_t thread) 1942 { 1943 thread_ro_t tro = get_thread_ro(thread); 1944 struct uthread *uth = get_bsdthread_info(thread); 1945 struct workqueue *wq = proc_get_wqptr(tro->tro_proc); 1946 thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket; 1947 wq_thactive_t old_thactive; 1948 bool start_timer = false; 1949 1950 if (qos == WORKQ_THREAD_QOS_MANAGER) { 1951 return; 1952 } 1953 1954 switch (type) { 1955 case SCHED_CALL_BLOCK: 1956 old_thactive = _wq_thactive_dec(wq, qos); 1957 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive); 1958 1959 /* 1960 * Remember the timestamp of the last thread that blocked in this 1961 * bucket, it used used by admission checks to ignore one thread 1962 * being inactive if this timestamp is recent enough. 1963 * 1964 * If we collide with another thread trying to update the 1965 * last_blocked (really unlikely since another thread would have to 1966 * get scheduled and then block after we start down this path), it's 1967 * not a problem. Either timestamp is adequate, so no need to retry 1968 */ 1969 os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)], 1970 thread_last_run_time(thread), relaxed); 1971 1972 if (req_qos == THREAD_QOS_UNSPECIFIED) { 1973 /* 1974 * No pending request at the moment we could unblock, move on. 1975 */ 1976 } else if (qos < req_qos) { 1977 /* 1978 * The blocking thread is at a lower QoS than the highest currently 1979 * pending constrained request, nothing has to be redriven 1980 */ 1981 } else { 1982 uint32_t max_busycount, old_req_count; 1983 old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive, 1984 req_qos, NULL, &max_busycount); 1985 /* 1986 * If it is possible that may_start_constrained_thread had refused 1987 * admission due to being over the max concurrency, we may need to 1988 * spin up a new thread. 1989 * 1990 * We take into account the maximum number of busy threads 1991 * that can affect may_start_constrained_thread as looking at the 1992 * actual number may_start_constrained_thread will see is racy. 1993 * 1994 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is 1995 * between NCPU (4) and NCPU - 2 (2) we need to redrive. 1996 */ 1997 uint32_t conc = wq_max_parallelism[_wq_bucket(qos)]; 1998 if (old_req_count <= conc && conc <= old_req_count + max_busycount) { 1999 start_timer = workq_schedule_delayed_thread_creation(wq, 0); 2000 } 2001 } 2002 if (__improbable(kdebug_enable)) { 2003 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq, 2004 old_thactive, qos, NULL, NULL); 2005 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq, 2006 old - 1, qos | (req_qos << 8), 2007 wq->wq_reqcount << 1 | start_timer); 2008 } 2009 break; 2010 2011 case SCHED_CALL_UNBLOCK: 2012 /* 2013 * we cannot take the workqueue_lock here... 2014 * an UNBLOCK can occur from a timer event which 2015 * is run from an interrupt context... if the workqueue_lock 2016 * is already held by this processor, we'll deadlock... 2017 * the thread lock for the thread being UNBLOCKED 2018 * is also held 2019 */ 2020 old_thactive = _wq_thactive_inc(wq, qos); 2021 if (__improbable(kdebug_enable)) { 2022 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq, 2023 old_thactive, qos, NULL, NULL); 2024 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive); 2025 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq, 2026 old + 1, qos | (req_qos << 8), 2027 wq->wq_threads_scheduled); 2028 } 2029 break; 2030 } 2031 } 2032 2033 #pragma mark workq lifecycle 2034 2035 void 2036 workq_reference(struct workqueue *wq) 2037 { 2038 os_ref_retain(&wq->wq_refcnt); 2039 } 2040 2041 static void 2042 workq_deallocate_queue_invoke(mpsc_queue_chain_t e, 2043 __assert_only mpsc_daemon_queue_t dq) 2044 { 2045 struct workqueue *wq; 2046 struct turnstile *ts; 2047 2048 wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link); 2049 assert(dq == &workq_deallocate_queue); 2050 2051 turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS); 2052 assert(ts); 2053 turnstile_cleanup(); 2054 turnstile_deallocate(ts); 2055 2056 lck_ticket_destroy(&wq->wq_lock, &workq_lck_grp); 2057 zfree(workq_zone_workqueue, wq); 2058 } 2059 2060 static void 2061 workq_deallocate(struct workqueue *wq) 2062 { 2063 if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) { 2064 workq_deallocate_queue_invoke(&wq->wq_destroy_link, 2065 &workq_deallocate_queue); 2066 } 2067 } 2068 2069 void 2070 workq_deallocate_safe(struct workqueue *wq) 2071 { 2072 if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) { 2073 mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link, 2074 MPSC_QUEUE_DISABLE_PREEMPTION); 2075 } 2076 } 2077 2078 /** 2079 * Setup per-process state for the workqueue. 2080 */ 2081 int 2082 workq_open(struct proc *p, __unused struct workq_open_args *uap, 2083 __unused int32_t *retval) 2084 { 2085 struct workqueue *wq; 2086 int error = 0; 2087 2088 if ((p->p_lflag & P_LREGISTER) == 0) { 2089 return EINVAL; 2090 } 2091 2092 if (wq_init_constrained_limit) { 2093 uint32_t limit, num_cpus = ml_wait_max_cpus(); 2094 2095 /* 2096 * set up the limit for the constrained pool 2097 * this is a virtual pool in that we don't 2098 * maintain it on a separate idle and run list 2099 */ 2100 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR; 2101 2102 if (limit > wq_max_constrained_threads) { 2103 wq_max_constrained_threads = limit; 2104 } 2105 2106 if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) { 2107 wq_max_threads = WQ_THACTIVE_BUCKET_HALF; 2108 } 2109 if (wq_max_threads > CONFIG_THREAD_MAX - 20) { 2110 wq_max_threads = CONFIG_THREAD_MAX - 20; 2111 } 2112 2113 wq_death_max_load = (uint16_t)fls(num_cpus) + 1; 2114 2115 for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) { 2116 wq_max_parallelism[_wq_bucket(qos)] = 2117 qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL); 2118 } 2119 2120 wq_max_cooperative_threads = num_cpus; 2121 2122 wq_init_constrained_limit = 0; 2123 } 2124 2125 if (proc_get_wqptr(p) == NULL) { 2126 if (proc_init_wqptr_or_wait(p) == FALSE) { 2127 assert(proc_get_wqptr(p) != NULL); 2128 goto out; 2129 } 2130 2131 wq = zalloc_flags(workq_zone_workqueue, Z_WAITOK | Z_ZERO); 2132 2133 os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1); 2134 2135 // Start the event manager at the priority hinted at by the policy engine 2136 thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task()); 2137 pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0); 2138 wq->wq_event_manager_priority = (uint32_t)pp; 2139 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime; 2140 wq->wq_proc = p; 2141 turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(), 2142 TURNSTILE_WORKQS); 2143 2144 TAILQ_INIT(&wq->wq_thrunlist); 2145 TAILQ_INIT(&wq->wq_thnewlist); 2146 TAILQ_INIT(&wq->wq_thidlelist); 2147 priority_queue_init(&wq->wq_overcommit_queue); 2148 priority_queue_init(&wq->wq_constrained_queue); 2149 priority_queue_init(&wq->wq_special_queue); 2150 for (int bucket = 0; bucket < WORKQ_NUM_QOS_BUCKETS; bucket++) { 2151 STAILQ_INIT(&wq->wq_cooperative_queue[bucket]); 2152 } 2153 2154 /* We are only using the delayed thread call for the constrained pool 2155 * which can't have work at >= UI QoS and so we can be fine with a 2156 * UI QoS thread call. 2157 */ 2158 wq->wq_delayed_call = thread_call_allocate_with_qos( 2159 workq_add_new_threads_call, p, THREAD_QOS_USER_INTERACTIVE, 2160 THREAD_CALL_OPTIONS_ONCE); 2161 wq->wq_immediate_call = thread_call_allocate_with_options( 2162 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL, 2163 THREAD_CALL_OPTIONS_ONCE); 2164 wq->wq_death_call = thread_call_allocate_with_options( 2165 workq_kill_old_threads_call, wq, 2166 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE); 2167 2168 lck_ticket_init(&wq->wq_lock, &workq_lck_grp); 2169 2170 WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq, 2171 VM_KERNEL_ADDRHIDE(wq), 0, 0); 2172 proc_set_wqptr(p, wq); 2173 } 2174 out: 2175 2176 return error; 2177 } 2178 2179 /* 2180 * Routine: workq_mark_exiting 2181 * 2182 * Function: Mark the work queue such that new threads will not be added to the 2183 * work queue after we return. 2184 * 2185 * Conditions: Called against the current process. 2186 */ 2187 void 2188 workq_mark_exiting(struct proc *p) 2189 { 2190 struct workqueue *wq = proc_get_wqptr(p); 2191 uint32_t wq_flags; 2192 workq_threadreq_t mgr_req; 2193 2194 if (!wq) { 2195 return; 2196 } 2197 2198 WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0); 2199 2200 workq_lock_spin(wq); 2201 2202 wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed); 2203 if (__improbable(wq_flags & WQ_EXITING)) { 2204 panic("workq_mark_exiting called twice"); 2205 } 2206 2207 /* 2208 * Opportunistically try to cancel thread calls that are likely in flight. 2209 * workq_exit() will do the proper cleanup. 2210 */ 2211 if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) { 2212 thread_call_cancel(wq->wq_immediate_call); 2213 } 2214 if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) { 2215 thread_call_cancel(wq->wq_delayed_call); 2216 } 2217 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) { 2218 thread_call_cancel(wq->wq_death_call); 2219 } 2220 2221 mgr_req = wq->wq_event_manager_threadreq; 2222 wq->wq_event_manager_threadreq = NULL; 2223 wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */ 2224 wq->wq_creator = NULL; 2225 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 2226 2227 workq_unlock(wq); 2228 2229 if (mgr_req) { 2230 kqueue_threadreq_cancel(p, mgr_req); 2231 } 2232 /* 2233 * No one touches the priority queues once WQ_EXITING is set. 2234 * It is hence safe to do the tear down without holding any lock. 2235 */ 2236 priority_queue_destroy(&wq->wq_overcommit_queue, 2237 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 2238 workq_threadreq_destroy(p, e); 2239 }); 2240 priority_queue_destroy(&wq->wq_constrained_queue, 2241 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 2242 workq_threadreq_destroy(p, e); 2243 }); 2244 priority_queue_destroy(&wq->wq_special_queue, 2245 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 2246 workq_threadreq_destroy(p, e); 2247 }); 2248 2249 WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0); 2250 } 2251 2252 /* 2253 * Routine: workq_exit 2254 * 2255 * Function: clean up the work queue structure(s) now that there are no threads 2256 * left running inside the work queue (except possibly current_thread). 2257 * 2258 * Conditions: Called by the last thread in the process. 2259 * Called against current process. 2260 */ 2261 void 2262 workq_exit(struct proc *p) 2263 { 2264 struct workqueue *wq; 2265 struct uthread *uth, *tmp; 2266 2267 wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed); 2268 if (wq != NULL) { 2269 thread_t th = current_thread(); 2270 2271 WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0); 2272 2273 if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) { 2274 /* 2275 * <rdar://problem/40111515> Make sure we will no longer call the 2276 * sched call, if we ever block this thread, which the cancel_wait 2277 * below can do. 2278 */ 2279 thread_sched_call(th, NULL); 2280 } 2281 2282 /* 2283 * Thread calls are always scheduled by the proc itself or under the 2284 * workqueue spinlock if WQ_EXITING is not yet set. 2285 * 2286 * Either way, when this runs, the proc has no threads left beside 2287 * the one running this very code, so we know no thread call can be 2288 * dispatched anymore. 2289 */ 2290 thread_call_cancel_wait(wq->wq_delayed_call); 2291 thread_call_cancel_wait(wq->wq_immediate_call); 2292 thread_call_cancel_wait(wq->wq_death_call); 2293 thread_call_free(wq->wq_delayed_call); 2294 thread_call_free(wq->wq_immediate_call); 2295 thread_call_free(wq->wq_death_call); 2296 2297 /* 2298 * Clean up workqueue data structures for threads that exited and 2299 * didn't get a chance to clean up after themselves. 2300 * 2301 * idle/new threads should have been interrupted and died on their own 2302 */ 2303 TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) { 2304 thread_t mth = get_machthread(uth); 2305 thread_sched_call(mth, NULL); 2306 thread_deallocate(mth); 2307 } 2308 assert(TAILQ_EMPTY(&wq->wq_thnewlist)); 2309 assert(TAILQ_EMPTY(&wq->wq_thidlelist)); 2310 2311 WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq, 2312 VM_KERNEL_ADDRHIDE(wq), 0, 0); 2313 2314 workq_deallocate(wq); 2315 2316 WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0); 2317 } 2318 } 2319 2320 2321 #pragma mark bsd thread control 2322 2323 bool 2324 bsdthread_part_of_cooperative_workqueue(struct uthread *uth) 2325 { 2326 return (workq_thread_is_cooperative(uth) || workq_thread_is_nonovercommit(uth)) && 2327 (uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER) && 2328 (!workq_thread_is_permanently_bound(uth)); 2329 } 2330 2331 static bool 2332 _pthread_priority_to_policy(pthread_priority_t priority, 2333 thread_qos_policy_data_t *data) 2334 { 2335 data->qos_tier = _pthread_priority_thread_qos(priority); 2336 data->tier_importance = _pthread_priority_relpri(priority); 2337 if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 || 2338 data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) { 2339 return false; 2340 } 2341 return true; 2342 } 2343 2344 static int 2345 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority, 2346 mach_port_name_t voucher, enum workq_set_self_flags flags) 2347 { 2348 struct uthread *uth = get_bsdthread_info(th); 2349 struct workqueue *wq = proc_get_wqptr(p); 2350 2351 kern_return_t kr; 2352 int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0; 2353 bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE); 2354 2355 assert(th == current_thread()); 2356 if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) { 2357 if (!is_wq_thread) { 2358 unbind_rv = EINVAL; 2359 goto qos; 2360 } 2361 2362 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 2363 unbind_rv = EINVAL; 2364 goto qos; 2365 } 2366 2367 workq_threadreq_t kqr = uth->uu_kqr_bound; 2368 if (kqr == NULL) { 2369 unbind_rv = EALREADY; 2370 goto qos; 2371 } 2372 2373 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 2374 unbind_rv = EINVAL; 2375 goto qos; 2376 } 2377 2378 kqueue_threadreq_unbind(p, kqr); 2379 } 2380 2381 qos: 2382 if (flags & (WORKQ_SET_SELF_QOS_FLAG | WORKQ_SET_SELF_QOS_OVERRIDE_FLAG)) { 2383 assert(flags & WORKQ_SET_SELF_QOS_FLAG); 2384 2385 thread_qos_policy_data_t new_policy; 2386 thread_qos_t qos_override = THREAD_QOS_UNSPECIFIED; 2387 2388 if (!_pthread_priority_to_policy(priority, &new_policy)) { 2389 qos_rv = EINVAL; 2390 goto voucher; 2391 } 2392 2393 if (flags & WORKQ_SET_SELF_QOS_OVERRIDE_FLAG) { 2394 /* 2395 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is set, we definitely 2396 * should have an override QoS in the pthread_priority_t and we should 2397 * only come into this path for cooperative thread requests 2398 */ 2399 if (!_pthread_priority_has_override_qos(priority) || 2400 !_pthread_priority_is_cooperative(priority)) { 2401 qos_rv = EINVAL; 2402 goto voucher; 2403 } 2404 qos_override = _pthread_priority_thread_override_qos(priority); 2405 } else { 2406 /* 2407 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is not set, we definitely 2408 * should not have an override QoS in the pthread_priority_t 2409 */ 2410 if (_pthread_priority_has_override_qos(priority)) { 2411 qos_rv = EINVAL; 2412 goto voucher; 2413 } 2414 } 2415 2416 if (!is_wq_thread) { 2417 /* 2418 * Threads opted out of QoS can't change QoS 2419 */ 2420 if (!thread_has_qos_policy(th)) { 2421 qos_rv = EPERM; 2422 goto voucher; 2423 } 2424 } else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER || 2425 uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) { 2426 /* 2427 * Workqueue manager threads or threads above UI can't change QoS 2428 */ 2429 qos_rv = EINVAL; 2430 goto voucher; 2431 } else { 2432 /* 2433 * For workqueue threads, possibly adjust buckets and redrive thread 2434 * requests. 2435 * 2436 * Transitions allowed: 2437 * 2438 * overcommit --> non-overcommit 2439 * overcommit --> overcommit 2440 * non-overcommit --> non-overcommit 2441 * non-overcommit --> overcommit (to be deprecated later) 2442 * cooperative --> cooperative 2443 * 2444 * All other transitions aren't allowed so reject them. 2445 */ 2446 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_cooperative(priority)) { 2447 qos_rv = EINVAL; 2448 goto voucher; 2449 } else if (workq_thread_is_cooperative(uth) && !_pthread_priority_is_cooperative(priority)) { 2450 qos_rv = EINVAL; 2451 goto voucher; 2452 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_cooperative(priority)) { 2453 qos_rv = EINVAL; 2454 goto voucher; 2455 } 2456 2457 struct uu_workq_policy old_pri, new_pri; 2458 bool force_run = false; 2459 2460 if (qos_override) { 2461 /* 2462 * We're in the case of a thread clarifying that it is for eg. not IN 2463 * req QoS but rather, UT req QoS with IN override. However, this can 2464 * race with a concurrent override happening to the thread via 2465 * workq_thread_add_dispatch_override so this needs to be 2466 * synchronized with the thread mutex. 2467 */ 2468 thread_mtx_lock(th); 2469 } 2470 2471 workq_lock_spin(wq); 2472 2473 old_pri = new_pri = uth->uu_workq_pri; 2474 new_pri.qos_req = (thread_qos_t)new_policy.qos_tier; 2475 2476 if (old_pri.qos_override < qos_override) { 2477 /* 2478 * Since this can race with a concurrent override via 2479 * workq_thread_add_dispatch_override, only adjust override value if we 2480 * are higher - this is a saturating function. 2481 * 2482 * We should not be changing the final override values, we should simply 2483 * be redistributing the current value with a different breakdown of req 2484 * vs override QoS - assert to that effect. Therefore, buckets should 2485 * not change. 2486 */ 2487 new_pri.qos_override = qos_override; 2488 assert(workq_pri_override(new_pri) == workq_pri_override(old_pri)); 2489 assert(workq_pri_bucket(new_pri) == workq_pri_bucket(old_pri)); 2490 } 2491 2492 /* Adjust schedule counts for various types of transitions */ 2493 2494 /* overcommit -> non-overcommit */ 2495 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_nonovercommit(priority)) { 2496 workq_thread_set_type(uth, 0); 2497 wq->wq_constrained_threads_scheduled++; 2498 2499 /* non-overcommit -> overcommit */ 2500 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_overcommit(priority)) { 2501 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT); 2502 force_run = (wq->wq_constrained_threads_scheduled-- == wq_max_constrained_threads); 2503 2504 /* cooperative -> cooperative */ 2505 } else if (workq_thread_is_cooperative(uth)) { 2506 _wq_cooperative_queue_scheduled_count_dec(wq, old_pri.qos_req); 2507 _wq_cooperative_queue_scheduled_count_inc(wq, new_pri.qos_req); 2508 2509 /* We're changing schedule counts within cooperative pool, we 2510 * need to refresh best cooperative QoS logic again */ 2511 force_run = _wq_cooperative_queue_refresh_best_req_qos(wq); 2512 } 2513 2514 /* 2515 * This will set up an override on the thread if any and will also call 2516 * schedule_creator if needed 2517 */ 2518 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run); 2519 workq_unlock(wq); 2520 2521 if (qos_override) { 2522 thread_mtx_unlock(th); 2523 } 2524 2525 if (workq_thread_is_overcommit(uth)) { 2526 thread_disarm_workqueue_quantum(th); 2527 } else { 2528 /* If the thread changed QoS buckets, the quantum duration 2529 * may have changed too */ 2530 thread_arm_workqueue_quantum(th); 2531 } 2532 } 2533 2534 kr = thread_policy_set_internal(th, THREAD_QOS_POLICY, 2535 (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT); 2536 if (kr != KERN_SUCCESS) { 2537 qos_rv = EINVAL; 2538 } 2539 } 2540 2541 voucher: 2542 if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) { 2543 kr = thread_set_voucher_name(voucher); 2544 if (kr != KERN_SUCCESS) { 2545 voucher_rv = ENOENT; 2546 goto fixedpri; 2547 } 2548 } 2549 2550 fixedpri: 2551 if (qos_rv) { 2552 goto done; 2553 } 2554 if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) { 2555 thread_extended_policy_data_t extpol = {.timeshare = 0}; 2556 2557 if (is_wq_thread) { 2558 /* Not allowed on workqueue threads */ 2559 fixedpri_rv = ENOTSUP; 2560 goto done; 2561 } 2562 2563 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY, 2564 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT); 2565 if (kr != KERN_SUCCESS) { 2566 fixedpri_rv = EINVAL; 2567 goto done; 2568 } 2569 } else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) { 2570 thread_extended_policy_data_t extpol = {.timeshare = 1}; 2571 2572 if (is_wq_thread) { 2573 /* Not allowed on workqueue threads */ 2574 fixedpri_rv = ENOTSUP; 2575 goto done; 2576 } 2577 2578 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY, 2579 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT); 2580 if (kr != KERN_SUCCESS) { 2581 fixedpri_rv = EINVAL; 2582 goto done; 2583 } 2584 } 2585 2586 done: 2587 if (qos_rv && voucher_rv) { 2588 /* Both failed, give that a unique error. */ 2589 return EBADMSG; 2590 } 2591 2592 if (unbind_rv) { 2593 return unbind_rv; 2594 } 2595 2596 if (qos_rv) { 2597 return qos_rv; 2598 } 2599 2600 if (voucher_rv) { 2601 return voucher_rv; 2602 } 2603 2604 if (fixedpri_rv) { 2605 return fixedpri_rv; 2606 } 2607 2608 2609 return 0; 2610 } 2611 2612 static int 2613 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport, 2614 pthread_priority_t pp, user_addr_t resource) 2615 { 2616 thread_qos_t qos = _pthread_priority_thread_qos(pp); 2617 if (qos == THREAD_QOS_UNSPECIFIED) { 2618 return EINVAL; 2619 } 2620 2621 thread_t th = port_name_to_thread(kport, 2622 PORT_INTRANS_THREAD_IN_CURRENT_TASK); 2623 if (th == THREAD_NULL) { 2624 return ESRCH; 2625 } 2626 2627 int rv = proc_thread_qos_add_override(proc_task(p), th, 0, qos, TRUE, 2628 resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE); 2629 2630 thread_deallocate(th); 2631 return rv; 2632 } 2633 2634 static int 2635 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport, 2636 user_addr_t resource) 2637 { 2638 thread_t th = port_name_to_thread(kport, 2639 PORT_INTRANS_THREAD_IN_CURRENT_TASK); 2640 if (th == THREAD_NULL) { 2641 return ESRCH; 2642 } 2643 2644 int rv = proc_thread_qos_remove_override(proc_task(p), th, 0, resource, 2645 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE); 2646 2647 thread_deallocate(th); 2648 return rv; 2649 } 2650 2651 static int 2652 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport, 2653 pthread_priority_t pp, user_addr_t ulock_addr) 2654 { 2655 struct uu_workq_policy old_pri, new_pri; 2656 struct workqueue *wq = proc_get_wqptr(p); 2657 2658 thread_qos_t qos_override = _pthread_priority_thread_qos(pp); 2659 if (qos_override == THREAD_QOS_UNSPECIFIED) { 2660 return EINVAL; 2661 } 2662 2663 thread_t thread = port_name_to_thread(kport, 2664 PORT_INTRANS_THREAD_IN_CURRENT_TASK); 2665 if (thread == THREAD_NULL) { 2666 return ESRCH; 2667 } 2668 2669 struct uthread *uth = get_bsdthread_info(thread); 2670 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) { 2671 thread_deallocate(thread); 2672 return EPERM; 2673 } 2674 2675 WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE, 2676 wq, thread_tid(thread), 1, pp); 2677 2678 thread_mtx_lock(thread); 2679 2680 if (ulock_addr) { 2681 uint32_t val; 2682 int rc; 2683 /* 2684 * Workaround lack of explicit support for 'no-fault copyin' 2685 * <rdar://problem/24999882>, as disabling preemption prevents paging in 2686 */ 2687 disable_preemption(); 2688 rc = copyin_atomic32(ulock_addr, &val); 2689 enable_preemption(); 2690 if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) { 2691 goto out; 2692 } 2693 } 2694 2695 workq_lock_spin(wq); 2696 2697 old_pri = uth->uu_workq_pri; 2698 if (old_pri.qos_override >= qos_override) { 2699 /* Nothing to do */ 2700 } else if (thread == current_thread()) { 2701 new_pri = old_pri; 2702 new_pri.qos_override = qos_override; 2703 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 2704 } else { 2705 uth->uu_workq_pri.qos_override = qos_override; 2706 if (qos_override > workq_pri_override(old_pri)) { 2707 thread_set_workq_override(thread, qos_override); 2708 } 2709 } 2710 2711 workq_unlock(wq); 2712 2713 out: 2714 thread_mtx_unlock(thread); 2715 thread_deallocate(thread); 2716 return 0; 2717 } 2718 2719 static int 2720 workq_thread_reset_dispatch_override(proc_t p, thread_t thread) 2721 { 2722 struct uu_workq_policy old_pri, new_pri; 2723 struct workqueue *wq = proc_get_wqptr(p); 2724 struct uthread *uth = get_bsdthread_info(thread); 2725 2726 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) { 2727 return EPERM; 2728 } 2729 2730 WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0); 2731 2732 /* 2733 * workq_thread_add_dispatch_override takes the thread mutex before doing the 2734 * copyin to validate the drainer and apply the override. We need to do the 2735 * same here. See rdar://84472518 2736 */ 2737 thread_mtx_lock(thread); 2738 2739 workq_lock_spin(wq); 2740 old_pri = new_pri = uth->uu_workq_pri; 2741 new_pri.qos_override = THREAD_QOS_UNSPECIFIED; 2742 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 2743 workq_unlock(wq); 2744 2745 thread_mtx_unlock(thread); 2746 return 0; 2747 } 2748 2749 static int 2750 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable) 2751 { 2752 if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) { 2753 // If the thread isn't a workqueue thread, don't set the 2754 // kill_allowed bit; however, we still need to return 0 2755 // instead of an error code since this code is executed 2756 // on the abort path which needs to not depend on the 2757 // pthread_t (returning an error depends on pthread_t via 2758 // cerror_nocancel) 2759 return 0; 2760 } 2761 struct uthread *uth = get_bsdthread_info(thread); 2762 uth->uu_workq_pthread_kill_allowed = enable; 2763 return 0; 2764 } 2765 2766 static int 2767 workq_allow_sigmask(proc_t p, sigset_t mask) 2768 { 2769 if (mask & workq_threadmask) { 2770 return EINVAL; 2771 } 2772 2773 proc_lock(p); 2774 p->p_workq_allow_sigmask |= mask; 2775 proc_unlock(p); 2776 2777 return 0; 2778 } 2779 2780 static int 2781 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags, 2782 int *retval) 2783 { 2784 static_assert(QOS_PARALLELISM_COUNT_LOGICAL == 2785 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical"); 2786 static_assert(QOS_PARALLELISM_REALTIME == 2787 _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime"); 2788 static_assert(QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE == 2789 _PTHREAD_QOS_PARALLELISM_CLUSTER_SHARED_RSRC, "cluster shared resource"); 2790 2791 if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL | QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE)) { 2792 return EINVAL; 2793 } 2794 2795 /* No units are present */ 2796 if (flags & QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE) { 2797 return ENOTSUP; 2798 } 2799 2800 if (flags & QOS_PARALLELISM_REALTIME) { 2801 if (qos) { 2802 return EINVAL; 2803 } 2804 } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) { 2805 return EINVAL; 2806 } 2807 2808 *retval = qos_max_parallelism(qos, flags); 2809 return 0; 2810 } 2811 2812 static int 2813 bsdthread_dispatch_apply_attr(__unused struct proc *p, thread_t thread, 2814 unsigned long flags, uint64_t value1, __unused uint64_t value2) 2815 { 2816 uint32_t apply_worker_index; 2817 kern_return_t kr; 2818 2819 switch (flags) { 2820 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_SET: 2821 apply_worker_index = (uint32_t)value1; 2822 kr = thread_shared_rsrc_policy_set(thread, apply_worker_index, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH); 2823 /* 2824 * KERN_INVALID_POLICY indicates that the thread was trying to bind to a 2825 * cluster which it was not eligible to execute on. 2826 */ 2827 return (kr == KERN_SUCCESS) ? 0 : ((kr == KERN_INVALID_POLICY) ? ENOTSUP : EINVAL); 2828 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_CLEAR: 2829 kr = thread_shared_rsrc_policy_clear(thread, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH); 2830 return (kr == KERN_SUCCESS) ? 0 : EINVAL; 2831 default: 2832 return EINVAL; 2833 } 2834 } 2835 2836 #define ENSURE_UNUSED(arg) \ 2837 ({ if ((arg) != 0) { return EINVAL; } }) 2838 2839 int 2840 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval) 2841 { 2842 switch (uap->cmd) { 2843 case BSDTHREAD_CTL_QOS_OVERRIDE_START: 2844 return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1, 2845 (pthread_priority_t)uap->arg2, uap->arg3); 2846 case BSDTHREAD_CTL_QOS_OVERRIDE_END: 2847 ENSURE_UNUSED(uap->arg3); 2848 return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1, 2849 (user_addr_t)uap->arg2); 2850 2851 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH: 2852 return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1, 2853 (pthread_priority_t)uap->arg2, uap->arg3); 2854 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET: 2855 return workq_thread_reset_dispatch_override(p, current_thread()); 2856 2857 case BSDTHREAD_CTL_SET_SELF: 2858 return bsdthread_set_self(p, current_thread(), 2859 (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2, 2860 (enum workq_set_self_flags)uap->arg3); 2861 2862 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM: 2863 ENSURE_UNUSED(uap->arg3); 2864 return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1, 2865 (unsigned long)uap->arg2, retval); 2866 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL: 2867 ENSURE_UNUSED(uap->arg2); 2868 ENSURE_UNUSED(uap->arg3); 2869 return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1); 2870 case BSDTHREAD_CTL_DISPATCH_APPLY_ATTR: 2871 return bsdthread_dispatch_apply_attr(p, current_thread(), 2872 (unsigned long)uap->arg1, (uint64_t)uap->arg2, 2873 (uint64_t)uap->arg3); 2874 case BSDTHREAD_CTL_WORKQ_ALLOW_SIGMASK: 2875 return workq_allow_sigmask(p, (int)uap->arg1); 2876 case BSDTHREAD_CTL_SET_QOS: 2877 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD: 2878 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET: 2879 /* no longer supported */ 2880 return ENOTSUP; 2881 2882 default: 2883 return EINVAL; 2884 } 2885 } 2886 2887 #pragma mark workqueue thread manipulation 2888 2889 static void __dead2 2890 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 2891 struct uthread *uth, uint32_t setup_flags); 2892 2893 static void __dead2 2894 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 2895 struct uthread *uth, uint32_t setup_flags); 2896 2897 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2; 2898 2899 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD 2900 static inline uint64_t 2901 workq_trace_req_id(workq_threadreq_t req) 2902 { 2903 struct kqworkloop *kqwl; 2904 if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 2905 kqwl = __container_of(req, struct kqworkloop, kqwl_request); 2906 return kqwl->kqwl_dynamicid; 2907 } 2908 2909 return VM_KERNEL_ADDRHIDE(req); 2910 } 2911 #endif 2912 2913 /** 2914 * Entry point for libdispatch to ask for threads 2915 */ 2916 static int 2917 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp, bool cooperative) 2918 { 2919 thread_qos_t qos = _pthread_priority_thread_qos(pp); 2920 struct workqueue *wq = proc_get_wqptr(p); 2921 uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI; 2922 int ret = 0; 2923 2924 if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX || 2925 qos == THREAD_QOS_UNSPECIFIED) { 2926 ret = EINVAL; 2927 goto exit; 2928 } 2929 2930 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE, 2931 wq, reqcount, pp, cooperative); 2932 2933 workq_threadreq_t req = zalloc(workq_zone_threadreq); 2934 priority_queue_entry_init(&req->tr_entry); 2935 req->tr_state = WORKQ_TR_STATE_NEW; 2936 req->tr_qos = qos; 2937 workq_tr_flags_t tr_flags = 0; 2938 2939 if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) { 2940 tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT; 2941 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 2942 } 2943 2944 if (cooperative) { 2945 tr_flags |= WORKQ_TR_FLAG_COOPERATIVE; 2946 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE; 2947 2948 if (reqcount > 1) { 2949 ret = ENOTSUP; 2950 goto free_and_exit; 2951 } 2952 } 2953 2954 /* A thread request cannot be both overcommit and cooperative */ 2955 if (workq_tr_is_cooperative(tr_flags) && 2956 workq_tr_is_overcommit(tr_flags)) { 2957 ret = EINVAL; 2958 goto free_and_exit; 2959 } 2960 req->tr_flags = tr_flags; 2961 2962 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, 2963 wq, workq_trace_req_id(req), req->tr_qos, reqcount); 2964 2965 workq_lock_spin(wq); 2966 do { 2967 if (_wq_exiting(wq)) { 2968 goto unlock_and_exit; 2969 } 2970 2971 /* 2972 * When userspace is asking for parallelism, wakeup up to (reqcount - 1) 2973 * threads without pacing, to inform the scheduler of that workload. 2974 * 2975 * The last requests, or the ones that failed the admission checks are 2976 * enqueued and go through the regular creator codepath. 2977 * 2978 * If there aren't enough threads, add one, but re-evaluate everything 2979 * as conditions may now have changed. 2980 */ 2981 unpaced = reqcount - 1; 2982 2983 if (reqcount > 1) { 2984 /* We don't handle asking for parallelism on the cooperative 2985 * workqueue just yet */ 2986 assert(!workq_threadreq_is_cooperative(req)); 2987 2988 if (workq_threadreq_is_nonovercommit(req)) { 2989 unpaced = workq_constrained_allowance(wq, qos, NULL, false, true); 2990 if (unpaced >= reqcount - 1) { 2991 unpaced = reqcount - 1; 2992 } 2993 } 2994 } 2995 2996 /* 2997 * This path does not currently handle custom workloop parameters 2998 * when creating threads for parallelism. 2999 */ 3000 assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)); 3001 3002 /* 3003 * This is a trimmed down version of workq_threadreq_bind_and_unlock() 3004 */ 3005 while (unpaced > 0 && wq->wq_thidlecount) { 3006 struct uthread *uth; 3007 bool needs_wakeup; 3008 uint8_t uu_flags = UT_WORKQ_EARLY_BOUND; 3009 3010 if (workq_tr_is_overcommit(req->tr_flags)) { 3011 uu_flags |= UT_WORKQ_OVERCOMMIT; 3012 } 3013 3014 uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup); 3015 3016 _wq_thactive_inc(wq, qos); 3017 wq->wq_thscheduled_count[_wq_bucket(qos)]++; 3018 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 3019 wq->wq_fulfilled++; 3020 3021 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags; 3022 uth->uu_save.uus_workq_park_data.thread_request = req; 3023 if (needs_wakeup) { 3024 workq_thread_wakeup(uth); 3025 } 3026 unpaced--; 3027 reqcount--; 3028 } 3029 } while (unpaced && wq->wq_nthreads < wq_max_threads && 3030 (workq_add_new_idle_thread(p, wq, workq_unpark_continue, 3031 false, NULL) == KERN_SUCCESS)); 3032 3033 if (_wq_exiting(wq)) { 3034 goto unlock_and_exit; 3035 } 3036 3037 req->tr_count = (uint16_t)reqcount; 3038 if (workq_threadreq_enqueue(wq, req)) { 3039 /* This can drop the workqueue lock, and take it again */ 3040 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 3041 } 3042 workq_unlock(wq); 3043 return 0; 3044 3045 unlock_and_exit: 3046 workq_unlock(wq); 3047 free_and_exit: 3048 zfree(workq_zone_threadreq, req); 3049 exit: 3050 return ret; 3051 } 3052 3053 bool 3054 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req, 3055 struct turnstile *workloop_ts, thread_qos_t qos, 3056 workq_kern_threadreq_flags_t flags) 3057 { 3058 struct workqueue *wq = proc_get_wqptr_fast(p); 3059 struct uthread *uth = NULL; 3060 3061 assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)); 3062 3063 /* 3064 * For any new initialization changes done to workqueue thread request below, 3065 * please also consider if they are relevant to permanently bound thread 3066 * request. See workq_kern_threadreq_permanent_bind. 3067 */ 3068 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 3069 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req); 3070 qos = thread_workq_qos_for_pri(trp.trp_pri); 3071 if (qos == THREAD_QOS_UNSPECIFIED) { 3072 qos = WORKQ_THREAD_QOS_ABOVEUI; 3073 } 3074 } 3075 3076 assert(req->tr_state == WORKQ_TR_STATE_IDLE); 3077 priority_queue_entry_init(&req->tr_entry); 3078 req->tr_count = 1; 3079 req->tr_state = WORKQ_TR_STATE_NEW; 3080 req->tr_qos = qos; 3081 3082 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq, 3083 workq_trace_req_id(req), qos, 1); 3084 3085 if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) { 3086 /* 3087 * we're called back synchronously from the context of 3088 * kqueue_threadreq_unbind from within workq_thread_return() 3089 * we can try to match up this thread with this request ! 3090 */ 3091 uth = current_uthread(); 3092 assert(uth->uu_kqr_bound == NULL); 3093 } 3094 3095 workq_lock_spin(wq); 3096 if (_wq_exiting(wq)) { 3097 req->tr_state = WORKQ_TR_STATE_IDLE; 3098 workq_unlock(wq); 3099 return false; 3100 } 3101 3102 if (uth && workq_threadreq_admissible(wq, uth, req)) { 3103 /* This is the case of the rebind - we were about to park and unbind 3104 * when more events came so keep the binding. 3105 */ 3106 assert(uth != wq->wq_creator); 3107 3108 if (uth->uu_workq_pri.qos_bucket != req->tr_qos) { 3109 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos); 3110 workq_thread_reset_pri(wq, uth, req, /*unpark*/ false); 3111 } 3112 /* 3113 * We're called from workq_kern_threadreq_initiate() 3114 * due to an unbind, with the kq req held. 3115 */ 3116 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 3117 workq_trace_req_id(req), req->tr_flags, 0); 3118 wq->wq_fulfilled++; 3119 3120 kqueue_threadreq_bind(p, req, get_machthread(uth), 0); 3121 } else { 3122 if (workloop_ts) { 3123 workq_perform_turnstile_operation_locked(wq, ^{ 3124 turnstile_update_inheritor(workloop_ts, wq->wq_turnstile, 3125 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE); 3126 turnstile_update_inheritor_complete(workloop_ts, 3127 TURNSTILE_INTERLOCK_HELD); 3128 }); 3129 } 3130 3131 bool reevaluate_creator_thread_group = false; 3132 #if CONFIG_PREADOPT_TG 3133 reevaluate_creator_thread_group = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG); 3134 #endif 3135 /* We enqueued the highest priority item or we may need to reevaluate if 3136 * the creator needs a thread group pre-adoption */ 3137 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_thread_group) { 3138 workq_schedule_creator(p, wq, flags); 3139 } 3140 } 3141 3142 workq_unlock(wq); 3143 3144 return true; 3145 } 3146 3147 void 3148 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req, 3149 thread_qos_t qos, workq_kern_threadreq_flags_t flags) 3150 { 3151 struct workqueue *wq = proc_get_wqptr_fast(p); 3152 bool make_overcommit = false; 3153 3154 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 3155 /* Requests outside-of-QoS shouldn't accept modify operations */ 3156 return; 3157 } 3158 3159 workq_lock_spin(wq); 3160 3161 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 3162 assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)); 3163 3164 if (req->tr_state == WORKQ_TR_STATE_BINDING) { 3165 kqueue_threadreq_bind(p, req, req->tr_thread, 0); 3166 workq_unlock(wq); 3167 return; 3168 } 3169 3170 if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) { 3171 /* TODO (rokhinip): We come into this code path for kqwl thread 3172 * requests. kqwl requests cannot be cooperative. 3173 */ 3174 assert(!workq_threadreq_is_cooperative(req)); 3175 3176 make_overcommit = workq_threadreq_is_nonovercommit(req); 3177 } 3178 3179 if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) { 3180 workq_unlock(wq); 3181 return; 3182 } 3183 3184 assert(req->tr_count == 1); 3185 if (req->tr_state != WORKQ_TR_STATE_QUEUED) { 3186 panic("Invalid thread request (%p) state %d", req, req->tr_state); 3187 } 3188 3189 WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq, 3190 workq_trace_req_id(req), qos, 0); 3191 3192 struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req); 3193 workq_threadreq_t req_max; 3194 3195 /* 3196 * Stage 1: Dequeue the request from its priority queue. 3197 * 3198 * If we dequeue the root item of the constrained priority queue, 3199 * maintain the best constrained request qos invariant. 3200 */ 3201 if (priority_queue_remove(pq, &req->tr_entry)) { 3202 if (workq_threadreq_is_nonovercommit(req)) { 3203 _wq_thactive_refresh_best_constrained_req_qos(wq); 3204 } 3205 } 3206 3207 /* 3208 * Stage 2: Apply changes to the thread request 3209 * 3210 * If the item will not become the root of the priority queue it belongs to, 3211 * then we need to wait in line, just enqueue and return quickly. 3212 */ 3213 if (__improbable(make_overcommit)) { 3214 req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT; 3215 pq = workq_priority_queue_for_req(wq, req); 3216 } 3217 req->tr_qos = qos; 3218 3219 req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry); 3220 if (req_max && req_max->tr_qos >= qos) { 3221 priority_queue_entry_set_sched_pri(pq, &req->tr_entry, 3222 workq_priority_for_req(req), false); 3223 priority_queue_insert(pq, &req->tr_entry); 3224 workq_unlock(wq); 3225 return; 3226 } 3227 3228 /* 3229 * Stage 3: Reevaluate whether we should run the thread request. 3230 * 3231 * Pretend the thread request is new again: 3232 * - adjust wq_reqcount to not count it anymore. 3233 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock 3234 * properly attempts a synchronous bind) 3235 */ 3236 wq->wq_reqcount--; 3237 req->tr_state = WORKQ_TR_STATE_NEW; 3238 3239 /* We enqueued the highest priority item or we may need to reevaluate if 3240 * the creator needs a thread group pre-adoption if the request got a new TG */ 3241 bool reevaluate_creator_tg = false; 3242 3243 #if CONFIG_PREADOPT_TG 3244 reevaluate_creator_tg = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG); 3245 #endif 3246 3247 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_tg) { 3248 workq_schedule_creator(p, wq, flags); 3249 } 3250 workq_unlock(wq); 3251 } 3252 3253 void 3254 workq_kern_bound_thread_reset_pri(workq_threadreq_t req, struct uthread *uth) 3255 { 3256 assert(workq_thread_is_permanently_bound(uth)); 3257 3258 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS)) { 3259 /* 3260 * For requests outside-of-QoS, we set the scheduling policy and 3261 * absolute priority for the bound thread right at the initialization 3262 * time. See workq_kern_threadreq_permanent_bind. 3263 */ 3264 return; 3265 } 3266 3267 struct workqueue *wq = proc_get_wqptr_fast(current_proc()); 3268 if (req) { 3269 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 3270 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 3271 } else { 3272 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri); 3273 if (qos > WORKQ_THREAD_QOS_CLEANUP) { 3274 workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true); 3275 } else { 3276 uth->uu_save.uus_workq_park_data.qos = qos; 3277 } 3278 } 3279 } 3280 3281 void 3282 workq_kern_threadreq_lock(struct proc *p) 3283 { 3284 workq_lock_spin(proc_get_wqptr_fast(p)); 3285 } 3286 3287 void 3288 workq_kern_threadreq_unlock(struct proc *p) 3289 { 3290 workq_unlock(proc_get_wqptr_fast(p)); 3291 } 3292 3293 void 3294 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req, 3295 thread_t owner, struct turnstile *wl_ts, 3296 turnstile_update_flags_t flags) 3297 { 3298 struct workqueue *wq = proc_get_wqptr_fast(p); 3299 turnstile_inheritor_t inheritor; 3300 3301 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 3302 assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP); 3303 workq_lock_held(wq); 3304 3305 if (req->tr_state == WORKQ_TR_STATE_BINDING) { 3306 kqueue_threadreq_bind(p, req, req->tr_thread, 3307 KQUEUE_THREADREQ_BIND_NO_INHERITOR_UPDATE); 3308 return; 3309 } 3310 3311 if (_wq_exiting(wq)) { 3312 inheritor = TURNSTILE_INHERITOR_NULL; 3313 } else { 3314 if (req->tr_state != WORKQ_TR_STATE_QUEUED) { 3315 panic("Invalid thread request (%p) state %d", req, req->tr_state); 3316 } 3317 3318 if (owner) { 3319 inheritor = owner; 3320 flags |= TURNSTILE_INHERITOR_THREAD; 3321 } else { 3322 inheritor = wq->wq_turnstile; 3323 flags |= TURNSTILE_INHERITOR_TURNSTILE; 3324 } 3325 } 3326 3327 workq_perform_turnstile_operation_locked(wq, ^{ 3328 turnstile_update_inheritor(wl_ts, inheritor, flags); 3329 }); 3330 } 3331 3332 /* 3333 * An entry point for kevent to request a newly created workqueue thread 3334 * and bind it permanently to the given workqueue thread request. 3335 * 3336 * It currently only supports fixed scheduler priority thread requests. 3337 * 3338 * The newly created thread counts towards wq_nthreads. This function returns 3339 * an error if we are above that limit. There is no concept of delayed thread 3340 * creation for such specially configured kqworkloops. 3341 * 3342 * If successful, the newly created thread will be parked in 3343 * workq_bound_thread_initialize_and_unpark_continue waiting for 3344 * new incoming events. 3345 */ 3346 kern_return_t 3347 workq_kern_threadreq_permanent_bind(struct proc *p, struct workq_threadreq_s *kqr) 3348 { 3349 kern_return_t ret = 0; 3350 thread_t new_thread = NULL; 3351 struct workqueue *wq = proc_get_wqptr_fast(p); 3352 3353 workq_lock_spin(wq); 3354 3355 if (wq->wq_nthreads >= wq_max_threads) { 3356 ret = EDOM; 3357 } else { 3358 if (kqr->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 3359 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(kqr); 3360 /* 3361 * For requests outside-of-QoS, we fully initialize the thread 3362 * request here followed by preadopting the scheduling properties 3363 * on the newly created bound thread. 3364 */ 3365 thread_qos_t qos = thread_workq_qos_for_pri(trp.trp_pri); 3366 if (qos == THREAD_QOS_UNSPECIFIED) { 3367 qos = WORKQ_THREAD_QOS_ABOVEUI; 3368 } 3369 kqr->tr_qos = qos; 3370 } 3371 kqr->tr_count = 1; 3372 3373 /* workq_lock dropped and retaken around thread creation below. */ 3374 ret = workq_add_new_idle_thread(p, wq, 3375 workq_bound_thread_initialize_and_unpark_continue, 3376 true, &new_thread); 3377 if (ret == KERN_SUCCESS) { 3378 struct uthread *uth = get_bsdthread_info(new_thread); 3379 if (kqr->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 3380 workq_thread_reset_pri(wq, uth, kqr, /*unpark*/ true); 3381 } 3382 /* 3383 * The newly created thread goes through a full bind to the kqwl 3384 * right upon creation. 3385 * It then falls back to soft bind/unbind upon wakeup/park. 3386 */ 3387 kqueue_threadreq_bind_prepost(p, kqr, uth); 3388 uth->uu_workq_flags |= UT_WORKQ_PERMANENT_BIND; 3389 } 3390 } 3391 3392 workq_unlock(wq); 3393 3394 if (ret == KERN_SUCCESS) { 3395 kqueue_threadreq_bind_commit(p, new_thread); 3396 } 3397 return ret; 3398 } 3399 3400 /* 3401 * Called with kqlock held. It does not need to take the process wide 3402 * global workq lock -> making it faster. 3403 */ 3404 void 3405 workq_kern_bound_thread_wakeup(struct workq_threadreq_s *kqr) 3406 { 3407 struct uthread *uth = get_bsdthread_info(kqr->tr_thread); 3408 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(kqr); 3409 3410 /* 3411 * See "Locking model for accessing uu_workq_flags" for more information 3412 * on how access to uu_workq_flags for the bound thread is synchronized. 3413 */ 3414 assert((uth->uu_workq_flags & (UT_WORKQ_RUNNING | UT_WORKQ_DYING)) == 0); 3415 3416 if (trp.trp_flags & TRP_RELEASED) { 3417 uth->uu_workq_flags |= UT_WORKQ_DYING; 3418 } else { 3419 uth->uu_workq_flags |= UT_WORKQ_RUNNING; 3420 } 3421 3422 workq_thread_wakeup(uth); 3423 } 3424 3425 /* 3426 * Called with kqlock held. Dropped before parking. 3427 * It does not need to take process wide global workqueue 3428 * lock -> making it faster. 3429 */ 3430 __attribute__((noreturn, noinline)) 3431 void 3432 workq_kern_bound_thread_park(struct workq_threadreq_s *kqr) 3433 { 3434 struct uthread *uth = get_bsdthread_info(kqr->tr_thread); 3435 assert(uth == current_uthread()); 3436 3437 /* 3438 * See "Locking model for accessing uu_workq_flags" for more information 3439 * on how access to uu_workq_flags for the bound thread is synchronized. 3440 */ 3441 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING); 3442 3443 thread_disarm_workqueue_quantum(get_machthread(uth)); 3444 3445 /* 3446 * TODO (pavhad) We could do the reusable userspace stack performance 3447 * optimization here. 3448 */ 3449 3450 kqworkloop_bound_thread_park_prepost(kqr); 3451 /* KQ_SLEEP bit is set and kqlock is dropped. */ 3452 3453 __assert_only kern_return_t kr; 3454 kr = thread_set_voucher_name(MACH_PORT_NULL); 3455 assert(kr == KERN_SUCCESS); 3456 3457 kqworkloop_bound_thread_park_commit(kqr, 3458 workq_parked_wait_event(uth), workq_bound_thread_unpark_continue); 3459 3460 __builtin_unreachable(); 3461 } 3462 3463 /* 3464 * To terminate the permenantly bound workqueue thread. It unbinds itself 3465 * with the kqwl during uthread_cleanup -> kqueue_threadreq_unbind. 3466 * It is also when it will release its reference on the kqwl. 3467 */ 3468 __attribute__((noreturn, noinline)) 3469 void 3470 workq_kern_bound_thread_terminate(struct workq_threadreq_s *kqr) 3471 { 3472 proc_t p = current_proc(); 3473 struct uthread *uth = get_bsdthread_info(kqr->tr_thread); 3474 uint16_t uu_workq_flags_orig; 3475 3476 assert(uth == current_uthread()); 3477 3478 /* 3479 * See "Locking model for accessing uu_workq_flags" for more information 3480 * on how access to uu_workq_flags for the bound thread is synchronized. 3481 */ 3482 kqworkloop_bound_thread_terminate(kqr, &uu_workq_flags_orig); 3483 3484 if (uu_workq_flags_orig & UT_WORKQ_WORK_INTERVAL_JOINED) { 3485 __assert_only kern_return_t kr; 3486 kr = kern_work_interval_join(get_machthread(uth), MACH_PORT_NULL); 3487 /* The bound thread un-joins the work interval and drops its +1 ref. */ 3488 assert(kr == KERN_SUCCESS); 3489 } 3490 3491 /* 3492 * Drop the voucher now that we are on our way to termination. 3493 */ 3494 __assert_only kern_return_t kr; 3495 kr = thread_set_voucher_name(MACH_PORT_NULL); 3496 assert(kr == KERN_SUCCESS); 3497 3498 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI; 3499 upcall_flags |= uth->uu_save.uus_workq_park_data.qos | 3500 WQ_FLAG_THREAD_PRIO_QOS; 3501 3502 thread_t th = get_machthread(uth); 3503 vm_map_t vmap = get_task_map(proc_task(p)); 3504 3505 if ((uu_workq_flags_orig & UT_WORKQ_NEW) == 0) { 3506 upcall_flags |= WQ_FLAG_THREAD_REUSE; 3507 } 3508 3509 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr, 3510 uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, upcall_flags); 3511 __builtin_unreachable(); 3512 } 3513 3514 void 3515 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags) 3516 { 3517 struct workqueue *wq = proc_get_wqptr_fast(p); 3518 3519 workq_lock_spin(wq); 3520 workq_schedule_creator(p, wq, flags); 3521 workq_unlock(wq); 3522 } 3523 3524 /* 3525 * Always called at AST by the thread on itself 3526 * 3527 * Upon quantum expiry, the workqueue subsystem evaluates its state and decides 3528 * on what the thread should do next. The TSD value is always set by the thread 3529 * on itself in the kernel and cleared either by userspace when it acks the TSD 3530 * value and takes action, or by the thread in the kernel when the quantum 3531 * expires again. 3532 */ 3533 void 3534 workq_kern_quantum_expiry_reevaluate(proc_t proc, thread_t thread) 3535 { 3536 struct uthread *uth = get_bsdthread_info(thread); 3537 3538 if (uth->uu_workq_flags & UT_WORKQ_DYING) { 3539 return; 3540 } 3541 3542 if (!thread_supports_cooperative_workqueue(thread)) { 3543 panic("Quantum expired for thread that doesn't support cooperative workqueue"); 3544 } 3545 3546 thread_qos_t qos = uth->uu_workq_pri.qos_bucket; 3547 if (qos == THREAD_QOS_UNSPECIFIED) { 3548 panic("Thread should not have workq bucket of QoS UN"); 3549 } 3550 3551 assert(thread_has_expired_workqueue_quantum(thread, false)); 3552 3553 struct workqueue *wq = proc_get_wqptr(proc); 3554 assert(wq != NULL); 3555 3556 /* 3557 * For starters, we're just going to evaluate and see if we need to narrow 3558 * the pool and tell this thread to park if needed. In the future, we'll 3559 * evaluate and convey other workqueue state information like needing to 3560 * pump kevents, etc. 3561 */ 3562 uint64_t flags = 0; 3563 3564 workq_lock_spin(wq); 3565 3566 if (workq_thread_is_cooperative(uth)) { 3567 if (!workq_cooperative_allowance(wq, qos, uth, false)) { 3568 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW; 3569 } else { 3570 /* In the future, when we have kevent hookups for the cooperative 3571 * pool, we need fancier logic for what userspace should do. But 3572 * right now, only userspace thread requests exist - so we'll just 3573 * tell userspace to shuffle work items */ 3574 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_SHUFFLE; 3575 } 3576 } else if (workq_thread_is_nonovercommit(uth)) { 3577 if (!workq_constrained_allowance(wq, qos, uth, false, false)) { 3578 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW; 3579 } 3580 } 3581 workq_unlock(wq); 3582 3583 WQ_TRACE(TRACE_wq_quantum_expiry_reevaluate, flags, 0, 0, 0); 3584 3585 kevent_set_workq_quantum_expiry_user_tsd(proc, thread, flags); 3586 3587 /* We have conveyed to userspace about what it needs to do upon quantum 3588 * expiry, now rearm the workqueue quantum again */ 3589 thread_arm_workqueue_quantum(get_machthread(uth)); 3590 } 3591 3592 void 3593 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked) 3594 { 3595 if (locked) { 3596 workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE); 3597 } else { 3598 workq_schedule_immediate_thread_creation(wq); 3599 } 3600 } 3601 3602 static int 3603 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap, 3604 struct workqueue *wq) 3605 { 3606 thread_t th = current_thread(); 3607 struct uthread *uth = get_bsdthread_info(th); 3608 workq_threadreq_t kqr = uth->uu_kqr_bound; 3609 workq_threadreq_param_t trp = { }; 3610 int nevents = uap->affinity, error; 3611 user_addr_t eventlist = uap->item; 3612 3613 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) || 3614 (uth->uu_workq_flags & UT_WORKQ_DYING)) { 3615 return EINVAL; 3616 } 3617 3618 if (eventlist && nevents && kqr == NULL) { 3619 return EINVAL; 3620 } 3621 3622 /* 3623 * Reset signal mask on the workqueue thread to default state, 3624 * but do not touch any signals that are marked for preservation. 3625 */ 3626 sigset_t resettable = uth->uu_sigmask & ~p->p_workq_allow_sigmask; 3627 if (resettable != (sigset_t)~workq_threadmask) { 3628 proc_lock(p); 3629 uth->uu_sigmask |= ~workq_threadmask & ~p->p_workq_allow_sigmask; 3630 proc_unlock(p); 3631 } 3632 3633 if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) { 3634 /* 3635 * Ensure we store the threadreq param before unbinding 3636 * the kqr from this thread. 3637 */ 3638 trp = kqueue_threadreq_workloop_param(kqr); 3639 } 3640 3641 if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_PERMANENT_BIND) { 3642 goto handle_stack_events; 3643 } 3644 3645 /* 3646 * Freeze the base pri while we decide the fate of this thread. 3647 * 3648 * Either: 3649 * - we return to user and kevent_cleanup will have unfrozen the base pri, 3650 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will. 3651 */ 3652 thread_freeze_base_pri(th); 3653 3654 handle_stack_events: 3655 3656 if (kqr) { 3657 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE; 3658 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 3659 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT; 3660 } else { 3661 upcall_flags |= WQ_FLAG_THREAD_KEVENT; 3662 } 3663 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 3664 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER; 3665 } else { 3666 if (workq_thread_is_overcommit(uth)) { 3667 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 3668 } 3669 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) { 3670 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS; 3671 } else { 3672 upcall_flags |= uth->uu_workq_pri.qos_req | 3673 WQ_FLAG_THREAD_PRIO_QOS; 3674 } 3675 } 3676 error = pthread_functions->workq_handle_stack_events(p, th, 3677 get_task_map(proc_task(p)), uth->uu_workq_stackaddr, 3678 uth->uu_workq_thport, eventlist, nevents, upcall_flags); 3679 if (error) { 3680 assert(uth->uu_kqr_bound == kqr); 3681 return error; 3682 } 3683 3684 // pthread is supposed to pass KEVENT_FLAG_PARKING here 3685 // which should cause the above call to either: 3686 // - not return 3687 // - return an error 3688 // - return 0 and have unbound properly 3689 assert(uth->uu_kqr_bound == NULL); 3690 } 3691 3692 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0); 3693 3694 thread_sched_call(th, NULL); 3695 thread_will_park_or_terminate(th); 3696 #if CONFIG_WORKLOOP_DEBUG 3697 UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, }); 3698 #endif 3699 3700 workq_lock_spin(wq); 3701 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0); 3702 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value; 3703 workq_select_threadreq_or_park_and_unlock(p, wq, uth, 3704 WQ_SETUP_CLEAR_VOUCHER); 3705 __builtin_unreachable(); 3706 } 3707 3708 /** 3709 * Multiplexed call to interact with the workqueue mechanism 3710 */ 3711 int 3712 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval) 3713 { 3714 int options = uap->options; 3715 int arg2 = uap->affinity; 3716 int arg3 = uap->prio; 3717 struct workqueue *wq = proc_get_wqptr(p); 3718 int error = 0; 3719 3720 if ((p->p_lflag & P_LREGISTER) == 0) { 3721 return EINVAL; 3722 } 3723 3724 switch (options) { 3725 case WQOPS_QUEUE_NEWSPISUPP: { 3726 /* 3727 * arg2 = offset of serialno into dispatch queue 3728 * arg3 = kevent support 3729 */ 3730 int offset = arg2; 3731 if (arg3 & 0x01) { 3732 // If we get here, then userspace has indicated support for kevent delivery. 3733 } 3734 3735 p->p_dispatchqueue_serialno_offset = (uint64_t)offset; 3736 break; 3737 } 3738 case WQOPS_QUEUE_REQTHREADS: { 3739 /* 3740 * arg2 = number of threads to start 3741 * arg3 = priority 3742 */ 3743 error = workq_reqthreads(p, arg2, arg3, false); 3744 break; 3745 } 3746 /* For requesting threads for the cooperative pool */ 3747 case WQOPS_QUEUE_REQTHREADS2: { 3748 /* 3749 * arg2 = number of threads to start 3750 * arg3 = priority 3751 */ 3752 error = workq_reqthreads(p, arg2, arg3, true); 3753 break; 3754 } 3755 case WQOPS_SET_EVENT_MANAGER_PRIORITY: { 3756 /* 3757 * arg2 = priority for the manager thread 3758 * 3759 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set, 3760 * the low bits of the value contains a scheduling priority 3761 * instead of a QOS value 3762 */ 3763 pthread_priority_t pri = arg2; 3764 3765 if (wq == NULL) { 3766 error = EINVAL; 3767 break; 3768 } 3769 3770 /* 3771 * Normalize the incoming priority so that it is ordered numerically. 3772 */ 3773 if (_pthread_priority_has_sched_pri(pri)) { 3774 pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK | 3775 _PTHREAD_PRIORITY_SCHED_PRI_FLAG); 3776 } else { 3777 thread_qos_t qos = _pthread_priority_thread_qos(pri); 3778 int relpri = _pthread_priority_relpri(pri); 3779 if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE || 3780 qos == THREAD_QOS_UNSPECIFIED) { 3781 error = EINVAL; 3782 break; 3783 } 3784 pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK; 3785 } 3786 3787 /* 3788 * If userspace passes a scheduling priority, that wins over any QoS. 3789 * Userspace should takes care not to lower the priority this way. 3790 */ 3791 workq_lock_spin(wq); 3792 if (wq->wq_event_manager_priority < (uint32_t)pri) { 3793 wq->wq_event_manager_priority = (uint32_t)pri; 3794 } 3795 workq_unlock(wq); 3796 break; 3797 } 3798 case WQOPS_THREAD_KEVENT_RETURN: 3799 case WQOPS_THREAD_WORKLOOP_RETURN: 3800 case WQOPS_THREAD_RETURN: { 3801 error = workq_thread_return(p, uap, wq); 3802 break; 3803 } 3804 3805 case WQOPS_SHOULD_NARROW: { 3806 /* 3807 * arg2 = priority to test 3808 * arg3 = unused 3809 */ 3810 thread_t th = current_thread(); 3811 struct uthread *uth = get_bsdthread_info(th); 3812 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) || 3813 (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) { 3814 error = EINVAL; 3815 break; 3816 } 3817 3818 thread_qos_t qos = _pthread_priority_thread_qos(arg2); 3819 if (qos == THREAD_QOS_UNSPECIFIED) { 3820 error = EINVAL; 3821 break; 3822 } 3823 workq_lock_spin(wq); 3824 bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false, false); 3825 workq_unlock(wq); 3826 3827 *retval = should_narrow; 3828 break; 3829 } 3830 case WQOPS_SETUP_DISPATCH: { 3831 /* 3832 * item = pointer to workq_dispatch_config structure 3833 * arg2 = sizeof(item) 3834 */ 3835 struct workq_dispatch_config cfg; 3836 bzero(&cfg, sizeof(cfg)); 3837 3838 error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2)); 3839 if (error) { 3840 break; 3841 } 3842 3843 if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS || 3844 cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) { 3845 error = ENOTSUP; 3846 break; 3847 } 3848 3849 /* Load fields from version 1 */ 3850 p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs; 3851 3852 /* Load fields from version 2 */ 3853 if (cfg.wdc_version >= 2) { 3854 p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs; 3855 } 3856 3857 break; 3858 } 3859 default: 3860 error = EINVAL; 3861 break; 3862 } 3863 3864 return error; 3865 } 3866 3867 /* 3868 * We have no work to do, park ourselves on the idle list. 3869 * 3870 * Consumes the workqueue lock and does not return. 3871 */ 3872 __attribute__((noreturn, noinline)) 3873 static void 3874 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth, 3875 uint32_t setup_flags) 3876 { 3877 assert(uth == current_uthread()); 3878 assert(uth->uu_kqr_bound == NULL); 3879 workq_push_idle_thread(p, wq, uth, setup_flags); // may not return 3880 3881 workq_thread_reset_cpupercent(NULL, uth); 3882 3883 #if CONFIG_PREADOPT_TG 3884 /* Clear the preadoption thread group on the thread. 3885 * 3886 * Case 1: 3887 * Creator thread which never picked up a thread request. We set a 3888 * preadoption thread group on creator threads but if it never picked 3889 * up a thread request and didn't go to userspace, then the thread will 3890 * park with a preadoption thread group but no explicitly adopted 3891 * voucher or work interval. 3892 * 3893 * We drop the preadoption thread group here before proceeding to park. 3894 * Note - we may get preempted when we drop the workq lock below. 3895 * 3896 * Case 2: 3897 * Thread picked up a thread request and bound to it and returned back 3898 * from userspace and is parking. At this point, preadoption thread 3899 * group should be NULL since the thread has unbound from the thread 3900 * request. So this operation should be a no-op. 3901 */ 3902 thread_set_preadopt_thread_group(get_machthread(uth), NULL); 3903 #endif 3904 3905 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) && 3906 !(uth->uu_workq_flags & UT_WORKQ_DYING)) { 3907 workq_unlock(wq); 3908 3909 /* 3910 * workq_push_idle_thread() will unset `has_stack` 3911 * if it wants us to free the stack before parking. 3912 */ 3913 if (!uth->uu_save.uus_workq_park_data.has_stack) { 3914 pthread_functions->workq_markfree_threadstack(p, 3915 get_machthread(uth), get_task_map(proc_task(p)), 3916 uth->uu_workq_stackaddr); 3917 } 3918 3919 /* 3920 * When we remove the voucher from the thread, we may lose our importance 3921 * causing us to get preempted, so we do this after putting the thread on 3922 * the idle list. Then, when we get our importance back we'll be able to 3923 * use this thread from e.g. the kevent call out to deliver a boosting 3924 * message. 3925 * 3926 * Note that setting the voucher to NULL will not clear the preadoption 3927 * thread since this thread could have become the creator again and 3928 * perhaps acquired a preadoption thread group. 3929 */ 3930 __assert_only kern_return_t kr; 3931 kr = thread_set_voucher_name(MACH_PORT_NULL); 3932 assert(kr == KERN_SUCCESS); 3933 3934 workq_lock_spin(wq); 3935 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP; 3936 setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER; 3937 } 3938 3939 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0); 3940 3941 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) { 3942 /* 3943 * While we'd dropped the lock to unset our voucher, someone came 3944 * around and made us runnable. But because we weren't waiting on the 3945 * event their thread_wakeup() was ineffectual. To correct for that, 3946 * we just run the continuation ourselves. 3947 */ 3948 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags); 3949 __builtin_unreachable(); 3950 } 3951 3952 if (uth->uu_workq_flags & UT_WORKQ_DYING) { 3953 workq_unpark_for_death_and_unlock(p, wq, uth, 3954 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags); 3955 __builtin_unreachable(); 3956 } 3957 3958 /* Disarm the workqueue quantum since the thread is now idle */ 3959 thread_disarm_workqueue_quantum(get_machthread(uth)); 3960 3961 thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue); 3962 assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE); 3963 workq_unlock(wq); 3964 thread_block(workq_unpark_continue); 3965 __builtin_unreachable(); 3966 } 3967 3968 static inline bool 3969 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth) 3970 { 3971 /* 3972 * There's an event manager request and either: 3973 * - no event manager currently running 3974 * - we are re-using the event manager 3975 */ 3976 return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 || 3977 (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER); 3978 } 3979 3980 /* Called with workq lock held. */ 3981 static uint32_t 3982 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos, 3983 struct uthread *uth, bool may_start_timer, bool record_failed_allowance) 3984 { 3985 assert(at_qos != WORKQ_THREAD_QOS_MANAGER); 3986 uint32_t allowance_passed = 0; 3987 uint32_t count = 0; 3988 3989 uint32_t max_count = wq->wq_constrained_threads_scheduled; 3990 if (uth && workq_thread_is_nonovercommit(uth)) { 3991 /* 3992 * don't count the current thread as scheduled 3993 */ 3994 assert(max_count > 0); 3995 max_count--; 3996 } 3997 if (max_count >= wq_max_constrained_threads) { 3998 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1, 3999 wq->wq_constrained_threads_scheduled, 4000 wq_max_constrained_threads); 4001 /* 4002 * we need 1 or more constrained threads to return to the kernel before 4003 * we can dispatch additional work 4004 */ 4005 allowance_passed = 0; 4006 goto out; 4007 } 4008 max_count -= wq_max_constrained_threads; 4009 4010 /* 4011 * Compute a metric for many how many threads are active. We find the 4012 * highest priority request outstanding and then add up the number of active 4013 * threads in that and all higher-priority buckets. We'll also add any 4014 * "busy" threads which are not currently active but blocked recently enough 4015 * that we can't be sure that they won't be unblocked soon and start 4016 * being active again. 4017 * 4018 * We'll then compare this metric to our max concurrency to decide whether 4019 * to add a new thread. 4020 */ 4021 4022 uint32_t busycount, thactive_count; 4023 4024 thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq), 4025 at_qos, &busycount, NULL); 4026 4027 if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER && 4028 at_qos <= uth->uu_workq_pri.qos_bucket) { 4029 /* 4030 * Don't count this thread as currently active, but only if it's not 4031 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active 4032 * managers. 4033 */ 4034 assert(thactive_count > 0); 4035 thactive_count--; 4036 } 4037 4038 count = wq_max_parallelism[_wq_bucket(at_qos)]; 4039 if (count > thactive_count + busycount) { 4040 count -= thactive_count + busycount; 4041 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2, 4042 thactive_count, busycount); 4043 allowance_passed = MIN(count, max_count); 4044 goto out; 4045 } else { 4046 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3, 4047 thactive_count, busycount); 4048 allowance_passed = 0; 4049 } 4050 4051 if (may_start_timer) { 4052 /* 4053 * If this is called from the add timer, we won't have another timer 4054 * fire when the thread exits the "busy" state, so rearm the timer. 4055 */ 4056 workq_schedule_delayed_thread_creation(wq, 0); 4057 } 4058 4059 out: 4060 if (record_failed_allowance) { 4061 wq->wq_exceeded_active_constrained_thread_limit = !allowance_passed; 4062 } 4063 return allowance_passed; 4064 } 4065 4066 static bool 4067 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth, 4068 workq_threadreq_t req) 4069 { 4070 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 4071 return workq_may_start_event_mgr_thread(wq, uth); 4072 } 4073 if (workq_threadreq_is_cooperative(req)) { 4074 return workq_cooperative_allowance(wq, req->tr_qos, uth, true); 4075 } 4076 if (workq_threadreq_is_nonovercommit(req)) { 4077 return workq_constrained_allowance(wq, req->tr_qos, uth, true, true); 4078 } 4079 4080 return true; 4081 } 4082 4083 /* 4084 * Called from the context of selecting thread requests for threads returning 4085 * from userspace or creator thread 4086 */ 4087 static workq_threadreq_t 4088 workq_cooperative_queue_best_req(struct workqueue *wq, struct uthread *uth) 4089 { 4090 workq_lock_held(wq); 4091 4092 /* 4093 * If the current thread is cooperative, we need to exclude it as part of 4094 * cooperative schedule count since this thread is looking for a new 4095 * request. Change in the schedule count for cooperative pool therefore 4096 * requires us to reeevaluate the next best request for it. 4097 */ 4098 if (uth && workq_thread_is_cooperative(uth)) { 4099 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req); 4100 4101 (void) _wq_cooperative_queue_refresh_best_req_qos(wq); 4102 4103 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req); 4104 } else { 4105 /* 4106 * The old value that was already precomputed should be safe to use - 4107 * add an assert that asserts that the best req QoS doesn't change in 4108 * this case 4109 */ 4110 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false); 4111 } 4112 4113 thread_qos_t qos = wq->wq_cooperative_queue_best_req_qos; 4114 4115 /* There are no eligible requests in the cooperative pool */ 4116 if (qos == THREAD_QOS_UNSPECIFIED) { 4117 return NULL; 4118 } 4119 assert(qos != WORKQ_THREAD_QOS_ABOVEUI); 4120 assert(qos != WORKQ_THREAD_QOS_MANAGER); 4121 4122 uint8_t bucket = _wq_bucket(qos); 4123 assert(!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])); 4124 4125 return STAILQ_FIRST(&wq->wq_cooperative_queue[bucket]); 4126 } 4127 4128 static workq_threadreq_t 4129 workq_threadreq_select_for_creator(struct workqueue *wq) 4130 { 4131 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr; 4132 thread_qos_t qos = THREAD_QOS_UNSPECIFIED; 4133 uint8_t pri = 0; 4134 4135 /* 4136 * Compute the best priority request, and ignore the turnstile for now 4137 */ 4138 4139 req_pri = priority_queue_max(&wq->wq_special_queue, 4140 struct workq_threadreq_s, tr_entry); 4141 if (req_pri) { 4142 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue, 4143 &req_pri->tr_entry); 4144 } 4145 4146 /* 4147 * Handle the manager thread request. The special queue might yield 4148 * a higher priority, but the manager always beats the QoS world. 4149 */ 4150 4151 req_mgr = wq->wq_event_manager_threadreq; 4152 if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) { 4153 uint32_t mgr_pri = wq->wq_event_manager_priority; 4154 4155 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) { 4156 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 4157 } else { 4158 mgr_pri = thread_workq_pri_for_qos( 4159 _pthread_priority_thread_qos(mgr_pri)); 4160 } 4161 4162 return mgr_pri >= pri ? req_mgr : req_pri; 4163 } 4164 4165 /* 4166 * Compute the best QoS Request, and check whether it beats the "pri" one 4167 * 4168 * Start by comparing the overcommit and the cooperative pool 4169 */ 4170 req_qos = priority_queue_max(&wq->wq_overcommit_queue, 4171 struct workq_threadreq_s, tr_entry); 4172 if (req_qos) { 4173 qos = req_qos->tr_qos; 4174 } 4175 4176 req_tmp = workq_cooperative_queue_best_req(wq, NULL); 4177 if (req_tmp && qos <= req_tmp->tr_qos) { 4178 /* 4179 * Cooperative TR is better between overcommit and cooperative. Note 4180 * that if qos is same between overcommit and cooperative, we choose 4181 * cooperative. 4182 * 4183 * Pick cooperative pool if it passes the admissions check 4184 */ 4185 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, NULL, true)) { 4186 req_qos = req_tmp; 4187 qos = req_qos->tr_qos; 4188 } 4189 } 4190 4191 /* 4192 * Compare the best QoS so far - either from overcommit or from cooperative 4193 * pool - and compare it with the constrained pool 4194 */ 4195 req_tmp = priority_queue_max(&wq->wq_constrained_queue, 4196 struct workq_threadreq_s, tr_entry); 4197 4198 if (req_tmp && qos < req_tmp->tr_qos) { 4199 /* 4200 * Constrained pool is best in QoS between overcommit, cooperative 4201 * and constrained. Now check how it fairs against the priority case 4202 */ 4203 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) { 4204 return req_pri; 4205 } 4206 4207 if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true, true)) { 4208 /* 4209 * If the constrained thread request is the best one and passes 4210 * the admission check, pick it. 4211 */ 4212 return req_tmp; 4213 } 4214 } 4215 4216 /* 4217 * Compare the best of the QoS world with the priority 4218 */ 4219 if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) { 4220 return req_pri; 4221 } 4222 4223 if (req_qos) { 4224 return req_qos; 4225 } 4226 4227 /* 4228 * If we had no eligible request but we have a turnstile push, 4229 * it must be a non overcommit thread request that failed 4230 * the admission check. 4231 * 4232 * Just fake a BG thread request so that if the push stops the creator 4233 * priority just drops to 4. 4234 */ 4235 if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) { 4236 static struct workq_threadreq_s workq_sync_push_fake_req = { 4237 .tr_qos = THREAD_QOS_BACKGROUND, 4238 }; 4239 4240 return &workq_sync_push_fake_req; 4241 } 4242 4243 return NULL; 4244 } 4245 4246 /* 4247 * Returns true if this caused a change in the schedule counts of the 4248 * cooperative pool 4249 */ 4250 static bool 4251 workq_adjust_cooperative_constrained_schedule_counts(struct workqueue *wq, 4252 struct uthread *uth, thread_qos_t old_thread_qos, workq_tr_flags_t tr_flags) 4253 { 4254 workq_lock_held(wq); 4255 4256 /* 4257 * Row: thread type 4258 * Column: Request type 4259 * 4260 * overcommit non-overcommit cooperative 4261 * overcommit X case 1 case 2 4262 * cooperative case 3 case 4 case 5 4263 * non-overcommit case 6 X case 7 4264 * 4265 * Move the thread to the right bucket depending on what state it currently 4266 * has and what state the thread req it picks, is going to have. 4267 * 4268 * Note that the creator thread is an overcommit thread. 4269 */ 4270 thread_qos_t new_thread_qos = uth->uu_workq_pri.qos_req; 4271 4272 /* 4273 * Anytime a cooperative bucket's schedule count changes, we need to 4274 * potentially refresh the next best QoS for that pool when we determine 4275 * the next request for the creator 4276 */ 4277 bool cooperative_pool_sched_count_changed = false; 4278 4279 if (workq_thread_is_overcommit(uth)) { 4280 if (workq_tr_is_nonovercommit(tr_flags)) { 4281 // Case 1: thread is overcommit, req is non-overcommit 4282 wq->wq_constrained_threads_scheduled++; 4283 } else if (workq_tr_is_cooperative(tr_flags)) { 4284 // Case 2: thread is overcommit, req is cooperative 4285 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos); 4286 cooperative_pool_sched_count_changed = true; 4287 } 4288 } else if (workq_thread_is_cooperative(uth)) { 4289 if (workq_tr_is_overcommit(tr_flags)) { 4290 // Case 3: thread is cooperative, req is overcommit 4291 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos); 4292 } else if (workq_tr_is_nonovercommit(tr_flags)) { 4293 // Case 4: thread is cooperative, req is non-overcommit 4294 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos); 4295 wq->wq_constrained_threads_scheduled++; 4296 } else { 4297 // Case 5: thread is cooperative, req is also cooperative 4298 assert(workq_tr_is_cooperative(tr_flags)); 4299 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos); 4300 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos); 4301 } 4302 cooperative_pool_sched_count_changed = true; 4303 } else { 4304 if (workq_tr_is_overcommit(tr_flags)) { 4305 // Case 6: Thread is non-overcommit, req is overcommit 4306 wq->wq_constrained_threads_scheduled--; 4307 } else if (workq_tr_is_cooperative(tr_flags)) { 4308 // Case 7: Thread is non-overcommit, req is cooperative 4309 wq->wq_constrained_threads_scheduled--; 4310 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos); 4311 cooperative_pool_sched_count_changed = true; 4312 } 4313 } 4314 4315 return cooperative_pool_sched_count_changed; 4316 } 4317 4318 static workq_threadreq_t 4319 workq_threadreq_select(struct workqueue *wq, struct uthread *uth) 4320 { 4321 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr; 4322 uintptr_t proprietor; 4323 thread_qos_t qos = THREAD_QOS_UNSPECIFIED; 4324 uint8_t pri = 0; 4325 4326 if (uth == wq->wq_creator) { 4327 uth = NULL; 4328 } 4329 4330 /* 4331 * Compute the best priority request (special or turnstile) 4332 */ 4333 4334 pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, 4335 &proprietor); 4336 if (pri) { 4337 struct kqworkloop *kqwl = (struct kqworkloop *)proprietor; 4338 req_pri = &kqwl->kqwl_request; 4339 if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) { 4340 panic("Invalid thread request (%p) state %d", 4341 req_pri, req_pri->tr_state); 4342 } 4343 } else { 4344 req_pri = NULL; 4345 } 4346 4347 req_tmp = priority_queue_max(&wq->wq_special_queue, 4348 struct workq_threadreq_s, tr_entry); 4349 if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue, 4350 &req_tmp->tr_entry)) { 4351 req_pri = req_tmp; 4352 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue, 4353 &req_tmp->tr_entry); 4354 } 4355 4356 /* 4357 * Handle the manager thread request. The special queue might yield 4358 * a higher priority, but the manager always beats the QoS world. 4359 */ 4360 4361 req_mgr = wq->wq_event_manager_threadreq; 4362 if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) { 4363 uint32_t mgr_pri = wq->wq_event_manager_priority; 4364 4365 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) { 4366 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 4367 } else { 4368 mgr_pri = thread_workq_pri_for_qos( 4369 _pthread_priority_thread_qos(mgr_pri)); 4370 } 4371 4372 return mgr_pri >= pri ? req_mgr : req_pri; 4373 } 4374 4375 /* 4376 * Compute the best QoS Request, and check whether it beats the "pri" one 4377 */ 4378 4379 req_qos = priority_queue_max(&wq->wq_overcommit_queue, 4380 struct workq_threadreq_s, tr_entry); 4381 if (req_qos) { 4382 qos = req_qos->tr_qos; 4383 } 4384 4385 req_tmp = workq_cooperative_queue_best_req(wq, uth); 4386 if (req_tmp && qos <= req_tmp->tr_qos) { 4387 /* 4388 * Cooperative TR is better between overcommit and cooperative. Note 4389 * that if qos is same between overcommit and cooperative, we choose 4390 * cooperative. 4391 * 4392 * Pick cooperative pool if it passes the admissions check 4393 */ 4394 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, uth, true)) { 4395 req_qos = req_tmp; 4396 qos = req_qos->tr_qos; 4397 } 4398 } 4399 4400 /* 4401 * Compare the best QoS so far - either from overcommit or from cooperative 4402 * pool - and compare it with the constrained pool 4403 */ 4404 req_tmp = priority_queue_max(&wq->wq_constrained_queue, 4405 struct workq_threadreq_s, tr_entry); 4406 4407 if (req_tmp && qos < req_tmp->tr_qos) { 4408 /* 4409 * Constrained pool is best in QoS between overcommit, cooperative 4410 * and constrained. Now check how it fairs against the priority case 4411 */ 4412 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) { 4413 return req_pri; 4414 } 4415 4416 if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true, true)) { 4417 /* 4418 * If the constrained thread request is the best one and passes 4419 * the admission check, pick it. 4420 */ 4421 return req_tmp; 4422 } 4423 } 4424 4425 if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) { 4426 return req_pri; 4427 } 4428 4429 return req_qos; 4430 } 4431 4432 /* 4433 * The creator is an anonymous thread that is counted as scheduled, 4434 * but otherwise without its scheduler callback set or tracked as active 4435 * that is used to make other threads. 4436 * 4437 * When more requests are added or an existing one is hurried along, 4438 * a creator is elected and setup, or the existing one overridden accordingly. 4439 * 4440 * While this creator is in flight, because no request has been dequeued, 4441 * already running threads have a chance at stealing thread requests avoiding 4442 * useless context switches, and the creator once scheduled may not find any 4443 * work to do and will then just park again. 4444 * 4445 * The creator serves the dual purpose of informing the scheduler of work that 4446 * hasn't be materialized as threads yet, and also as a natural pacing mechanism 4447 * for thread creation. 4448 * 4449 * By being anonymous (and not bound to anything) it means that thread requests 4450 * can be stolen from this creator by threads already on core yielding more 4451 * efficient scheduling and reduced context switches. 4452 */ 4453 static void 4454 workq_schedule_creator(proc_t p, struct workqueue *wq, 4455 workq_kern_threadreq_flags_t flags) 4456 { 4457 workq_threadreq_t req; 4458 struct uthread *uth; 4459 bool needs_wakeup; 4460 4461 workq_lock_held(wq); 4462 assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0); 4463 4464 again: 4465 uth = wq->wq_creator; 4466 4467 if (!wq->wq_reqcount) { 4468 /* 4469 * There is no thread request left. 4470 * 4471 * If there is a creator, leave everything in place, so that it cleans 4472 * up itself in workq_push_idle_thread(). 4473 * 4474 * Else, make sure the turnstile state is reset to no inheritor. 4475 */ 4476 if (uth == NULL) { 4477 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 4478 } 4479 return; 4480 } 4481 4482 req = workq_threadreq_select_for_creator(wq); 4483 if (req == NULL) { 4484 /* 4485 * There isn't a thread request that passes the admission check. 4486 * 4487 * If there is a creator, do not touch anything, the creator will sort 4488 * it out when it runs. 4489 * 4490 * Else, set the inheritor to "WORKQ" so that the turnstile propagation 4491 * code calls us if anything changes. 4492 */ 4493 if (uth == NULL) { 4494 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ); 4495 } 4496 return; 4497 } 4498 4499 4500 if (uth) { 4501 /* 4502 * We need to maybe override the creator we already have 4503 */ 4504 if (workq_thread_needs_priority_change(req, uth)) { 4505 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE, 4506 wq, 1, uthread_tid(uth), req->tr_qos); 4507 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 4508 } 4509 assert(wq->wq_inheritor == get_machthread(uth)); 4510 } else if (wq->wq_thidlecount) { 4511 /* 4512 * We need to unpark a creator thread 4513 */ 4514 wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT, 4515 &needs_wakeup); 4516 /* Always reset the priorities on the newly chosen creator */ 4517 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 4518 workq_turnstile_update_inheritor(wq, get_machthread(uth), 4519 TURNSTILE_INHERITOR_THREAD); 4520 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE, 4521 wq, 2, uthread_tid(uth), req->tr_qos); 4522 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled; 4523 uth->uu_save.uus_workq_park_data.yields = 0; 4524 if (needs_wakeup) { 4525 workq_thread_wakeup(uth); 4526 } 4527 } else { 4528 /* 4529 * We need to allocate a thread... 4530 */ 4531 if (__improbable(wq->wq_nthreads >= wq_max_threads)) { 4532 /* out of threads, just go away */ 4533 flags = WORKQ_THREADREQ_NONE; 4534 } else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) { 4535 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ); 4536 } else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) { 4537 /* This can drop the workqueue lock, and take it again */ 4538 workq_schedule_immediate_thread_creation(wq); 4539 } else if ((workq_add_new_idle_thread(p, wq, 4540 workq_unpark_continue, false, NULL) == KERN_SUCCESS)) { 4541 goto again; 4542 } else { 4543 workq_schedule_delayed_thread_creation(wq, 0); 4544 } 4545 4546 /* 4547 * If the current thread is the inheritor: 4548 * 4549 * If we set the AST, then the thread will stay the inheritor until 4550 * either the AST calls workq_kern_threadreq_redrive(), or it parks 4551 * and calls workq_push_idle_thread(). 4552 * 4553 * Else, the responsibility of the thread creation is with a thread-call 4554 * and we need to clear the inheritor. 4555 */ 4556 if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 && 4557 wq->wq_inheritor == current_thread()) { 4558 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 4559 } 4560 } 4561 } 4562 4563 /** 4564 * Same as workq_unpark_select_threadreq_or_park_and_unlock, 4565 * but do not allow early binds. 4566 * 4567 * Called with the base pri frozen, will unfreeze it. 4568 */ 4569 __attribute__((noreturn, noinline)) 4570 static void 4571 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 4572 struct uthread *uth, uint32_t setup_flags) 4573 { 4574 workq_threadreq_t req = NULL; 4575 bool is_creator = (wq->wq_creator == uth); 4576 bool schedule_creator = false; 4577 4578 if (__improbable(_wq_exiting(wq))) { 4579 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0); 4580 goto park; 4581 } 4582 4583 if (wq->wq_reqcount == 0) { 4584 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0); 4585 goto park; 4586 } 4587 4588 req = workq_threadreq_select(wq, uth); 4589 if (__improbable(req == NULL)) { 4590 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0); 4591 goto park; 4592 } 4593 4594 struct uu_workq_policy old_pri = uth->uu_workq_pri; 4595 uint8_t tr_flags = req->tr_flags; 4596 struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req); 4597 4598 /* 4599 * Attempt to setup ourselves as the new thing to run, moving all priority 4600 * pushes to ourselves. 4601 * 4602 * If the current thread is the creator, then the fact that we are presently 4603 * running is proof that we'll do something useful, so keep going. 4604 * 4605 * For other cases, peek at the AST to know whether the scheduler wants 4606 * to preempt us, if yes, park instead, and move the thread request 4607 * turnstile back to the workqueue. 4608 */ 4609 if (req_ts) { 4610 workq_perform_turnstile_operation_locked(wq, ^{ 4611 turnstile_update_inheritor(req_ts, get_machthread(uth), 4612 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD); 4613 turnstile_update_inheritor_complete(req_ts, 4614 TURNSTILE_INTERLOCK_HELD); 4615 }); 4616 } 4617 4618 /* accounting changes of aggregate thscheduled_count and thactive which has 4619 * to be paired with the workq_thread_reset_pri below so that we have 4620 * uth->uu_workq_pri match with thactive. 4621 * 4622 * This is undone when the thread parks */ 4623 if (is_creator) { 4624 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0, 4625 uth->uu_save.uus_workq_park_data.yields); 4626 wq->wq_creator = NULL; 4627 _wq_thactive_inc(wq, req->tr_qos); 4628 wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++; 4629 } else if (old_pri.qos_bucket != req->tr_qos) { 4630 _wq_thactive_move(wq, old_pri.qos_bucket, req->tr_qos); 4631 } 4632 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 4633 4634 /* 4635 * Make relevant accounting changes for pool specific counts. 4636 * 4637 * The schedule counts changing can affect what the next best request 4638 * for cooperative thread pool is if this request is dequeued. 4639 */ 4640 bool cooperative_sched_count_changed = 4641 workq_adjust_cooperative_constrained_schedule_counts(wq, uth, 4642 old_pri.qos_req, tr_flags); 4643 4644 if (workq_tr_is_overcommit(tr_flags)) { 4645 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT); 4646 } else if (workq_tr_is_cooperative(tr_flags)) { 4647 workq_thread_set_type(uth, UT_WORKQ_COOPERATIVE); 4648 } else { 4649 workq_thread_set_type(uth, 0); 4650 } 4651 4652 if (__improbable(thread_unfreeze_base_pri(get_machthread(uth)) && !is_creator)) { 4653 if (req_ts) { 4654 workq_perform_turnstile_operation_locked(wq, ^{ 4655 turnstile_update_inheritor(req_ts, wq->wq_turnstile, 4656 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE); 4657 turnstile_update_inheritor_complete(req_ts, 4658 TURNSTILE_INTERLOCK_HELD); 4659 }); 4660 } 4661 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0); 4662 4663 /* 4664 * If a cooperative thread was the one which picked up the manager 4665 * thread request, we need to reevaluate the cooperative pool before 4666 * it goes and parks. 4667 * 4668 * For every other of thread request that it picks up, the logic in 4669 * workq_threadreq_select should have done this refresh. 4670 * See workq_push_idle_thread. 4671 */ 4672 if (cooperative_sched_count_changed) { 4673 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 4674 _wq_cooperative_queue_refresh_best_req_qos(wq); 4675 } 4676 } 4677 goto park_thawed; 4678 } 4679 4680 /* 4681 * We passed all checks, dequeue the request, bind to it, and set it up 4682 * to return to user. 4683 */ 4684 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 4685 workq_trace_req_id(req), tr_flags, 0); 4686 wq->wq_fulfilled++; 4687 schedule_creator = workq_threadreq_dequeue(wq, req, 4688 cooperative_sched_count_changed); 4689 4690 workq_thread_reset_cpupercent(req, uth); 4691 4692 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) { 4693 kqueue_threadreq_bind_prepost(p, req, uth); 4694 req = NULL; 4695 } else if (req->tr_count > 0) { 4696 req = NULL; 4697 } 4698 4699 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 4700 uth->uu_workq_flags ^= UT_WORKQ_NEW; 4701 setup_flags |= WQ_SETUP_FIRST_USE; 4702 } 4703 4704 /* If one of the following is true, call workq_schedule_creator (which also 4705 * adjusts priority of existing creator): 4706 * 4707 * - We are the creator currently so the wq may need a new creator 4708 * - The request we're binding to is the highest priority one, existing 4709 * creator's priority might need to be adjusted to reflect the next 4710 * highest TR 4711 */ 4712 if (is_creator || schedule_creator) { 4713 /* This can drop the workqueue lock, and take it again */ 4714 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 4715 } 4716 4717 workq_unlock(wq); 4718 4719 if (req) { 4720 zfree(workq_zone_threadreq, req); 4721 } 4722 4723 /* 4724 * Run Thread, Run! 4725 */ 4726 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI; 4727 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 4728 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER; 4729 } else if (workq_tr_is_overcommit(tr_flags)) { 4730 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 4731 } else if (workq_tr_is_cooperative(tr_flags)) { 4732 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE; 4733 } 4734 if (tr_flags & WORKQ_TR_FLAG_KEVENT) { 4735 upcall_flags |= WQ_FLAG_THREAD_KEVENT; 4736 assert((upcall_flags & WQ_FLAG_THREAD_COOPERATIVE) == 0); 4737 } 4738 4739 if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 4740 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT; 4741 } 4742 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags; 4743 4744 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) { 4745 kqueue_threadreq_bind_commit(p, get_machthread(uth)); 4746 } else { 4747 #if CONFIG_PREADOPT_TG 4748 /* 4749 * The thread may have a preadopt thread group on it already because it 4750 * got tagged with it as a creator thread. So we need to make sure to 4751 * clear that since we don't have preadoption for anonymous thread 4752 * requests 4753 */ 4754 thread_set_preadopt_thread_group(get_machthread(uth), NULL); 4755 #endif 4756 } 4757 4758 workq_setup_and_run(p, uth, setup_flags); 4759 __builtin_unreachable(); 4760 4761 park: 4762 thread_unfreeze_base_pri(get_machthread(uth)); 4763 park_thawed: 4764 workq_park_and_unlock(p, wq, uth, setup_flags); 4765 } 4766 4767 /** 4768 * Runs a thread request on a thread 4769 * 4770 * - if thread is THREAD_NULL, will find a thread and run the request there. 4771 * Otherwise, the thread must be the current thread. 4772 * 4773 * - if req is NULL, will find the highest priority request and run that. If 4774 * it is not NULL, it must be a threadreq object in state NEW. If it can not 4775 * be run immediately, it will be enqueued and moved to state QUEUED. 4776 * 4777 * Either way, the thread request object serviced will be moved to state 4778 * BINDING and attached to the uthread. 4779 * 4780 * Should be called with the workqueue lock held. Will drop it. 4781 * Should be called with the base pri not frozen. 4782 */ 4783 __attribute__((noreturn, noinline)) 4784 static void 4785 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 4786 struct uthread *uth, uint32_t setup_flags) 4787 { 4788 if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) { 4789 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 4790 setup_flags |= WQ_SETUP_FIRST_USE; 4791 } 4792 uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND); 4793 /* 4794 * This pointer is possibly freed and only used for tracing purposes. 4795 */ 4796 workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request; 4797 workq_unlock(wq); 4798 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 4799 VM_KERNEL_ADDRHIDE(req), 0, 0); 4800 (void)req; 4801 4802 workq_setup_and_run(p, uth, setup_flags); 4803 __builtin_unreachable(); 4804 } 4805 4806 thread_freeze_base_pri(get_machthread(uth)); 4807 workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags); 4808 } 4809 4810 static bool 4811 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth) 4812 { 4813 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri); 4814 4815 if (qos >= THREAD_QOS_USER_INTERACTIVE) { 4816 return false; 4817 } 4818 4819 uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot; 4820 if (wq->wq_fulfilled == snapshot) { 4821 return false; 4822 } 4823 4824 uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)]; 4825 if (wq->wq_fulfilled - snapshot > conc) { 4826 /* we fulfilled more than NCPU requests since being dispatched */ 4827 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1, 4828 wq->wq_fulfilled, snapshot); 4829 return true; 4830 } 4831 4832 for (uint8_t i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) { 4833 cnt += wq->wq_thscheduled_count[i]; 4834 } 4835 if (conc <= cnt) { 4836 /* We fulfilled requests and have more than NCPU scheduled threads */ 4837 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2, 4838 wq->wq_fulfilled, snapshot); 4839 return true; 4840 } 4841 4842 return false; 4843 } 4844 4845 /** 4846 * parked idle thread wakes up 4847 */ 4848 __attribute__((noreturn, noinline)) 4849 static void 4850 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused) 4851 { 4852 thread_t th = current_thread(); 4853 struct uthread *uth = get_bsdthread_info(th); 4854 proc_t p = current_proc(); 4855 struct workqueue *wq = proc_get_wqptr_fast(p); 4856 4857 workq_lock_spin(wq); 4858 4859 if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) { 4860 /* 4861 * If the number of threads we have out are able to keep up with the 4862 * demand, then we should avoid sending this creator thread to 4863 * userspace. 4864 */ 4865 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled; 4866 uth->uu_save.uus_workq_park_data.yields++; 4867 workq_unlock(wq); 4868 thread_yield_with_continuation(workq_unpark_continue, NULL); 4869 __builtin_unreachable(); 4870 } 4871 4872 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) { 4873 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE); 4874 __builtin_unreachable(); 4875 } 4876 4877 if (__probable(wr == THREAD_AWAKENED)) { 4878 /* 4879 * We were set running, but for the purposes of dying. 4880 */ 4881 assert(uth->uu_workq_flags & UT_WORKQ_DYING); 4882 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0); 4883 } else { 4884 /* 4885 * workaround for <rdar://problem/38647347>, 4886 * in case we do hit userspace, make sure calling 4887 * workq_thread_terminate() does the right thing here, 4888 * and if we never call it, that workq_exit() will too because it sees 4889 * this thread on the runlist. 4890 */ 4891 assert(wr == THREAD_INTERRUPTED); 4892 wq->wq_thdying_count++; 4893 uth->uu_workq_flags |= UT_WORKQ_DYING; 4894 } 4895 4896 workq_unpark_for_death_and_unlock(p, wq, uth, 4897 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE); 4898 __builtin_unreachable(); 4899 } 4900 4901 __attribute__((noreturn, noinline)) 4902 static void 4903 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags) 4904 { 4905 thread_t th = get_machthread(uth); 4906 vm_map_t vmap = get_task_map(proc_task(p)); 4907 4908 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) { 4909 /* 4910 * For preemption reasons, we want to reset the voucher as late as 4911 * possible, so we do it in two places: 4912 * - Just before parking (i.e. in workq_park_and_unlock()) 4913 * - Prior to doing the setup for the next workitem (i.e. here) 4914 * 4915 * Those two places are sufficient to ensure we always reset it before 4916 * it goes back out to user space, but be careful to not break that 4917 * guarantee. 4918 * 4919 * Note that setting the voucher to NULL will not clear the preadoption 4920 * thread group on this thread 4921 */ 4922 __assert_only kern_return_t kr; 4923 kr = thread_set_voucher_name(MACH_PORT_NULL); 4924 assert(kr == KERN_SUCCESS); 4925 } 4926 4927 uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags; 4928 if (!(setup_flags & WQ_SETUP_FIRST_USE)) { 4929 upcall_flags |= WQ_FLAG_THREAD_REUSE; 4930 } 4931 4932 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) { 4933 /* 4934 * For threads that have an outside-of-QoS thread priority, indicate 4935 * to userspace that setting QoS should only affect the TSD and not 4936 * change QOS in the kernel. 4937 */ 4938 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS; 4939 } else { 4940 /* 4941 * Put the QoS class value into the lower bits of the reuse_thread 4942 * register, this is where the thread priority used to be stored 4943 * anyway. 4944 */ 4945 upcall_flags |= uth->uu_save.uus_workq_park_data.qos | 4946 WQ_FLAG_THREAD_PRIO_QOS; 4947 } 4948 4949 if (uth->uu_workq_thport == MACH_PORT_NULL) { 4950 /* convert_thread_to_port_pinned() consumes a reference */ 4951 thread_reference(th); 4952 /* Convert to immovable/pinned thread port, but port is not pinned yet */ 4953 ipc_port_t port = convert_thread_to_port_pinned(th); 4954 /* Atomically, pin and copy out the port */ 4955 uth->uu_workq_thport = ipc_port_copyout_send_pinned(port, get_task_ipcspace(proc_task(p))); 4956 } 4957 4958 /* Thread has been set up to run, arm its next workqueue quantum or disarm 4959 * if it is no longer supporting that */ 4960 if (thread_supports_cooperative_workqueue(th)) { 4961 thread_arm_workqueue_quantum(th); 4962 } else { 4963 thread_disarm_workqueue_quantum(th); 4964 } 4965 4966 /* 4967 * Call out to pthread, this sets up the thread, pulls in kevent structs 4968 * onto the stack, sets up the thread state and then returns to userspace. 4969 */ 4970 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START, 4971 proc_get_wqptr_fast(p), 0, 0, 0); 4972 4973 if (workq_thread_is_cooperative(uth) || workq_thread_is_permanently_bound(uth)) { 4974 thread_sched_call(th, NULL); 4975 } else { 4976 thread_sched_call(th, workq_sched_callback); 4977 } 4978 4979 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr, 4980 uth->uu_workq_thport, 0, setup_flags, upcall_flags); 4981 4982 __builtin_unreachable(); 4983 } 4984 4985 /** 4986 * A wrapper around workq_setup_and_run for permanently bound thread. 4987 */ 4988 __attribute__((noreturn, noinline)) 4989 static void 4990 workq_bound_thread_setup_and_run(struct uthread *uth, int setup_flags) 4991 { 4992 struct workq_threadreq_s * kqr = uth->uu_kqr_bound; 4993 4994 uint32_t upcall_flags = (WQ_FLAG_THREAD_NEWSPI | 4995 WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT); 4996 if (workq_tr_is_overcommit(kqr->tr_flags)) { 4997 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT); 4998 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 4999 } 5000 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags; 5001 workq_setup_and_run(current_proc(), uth, setup_flags); 5002 __builtin_unreachable(); 5003 } 5004 5005 /** 5006 * A parked bound thread wakes up for the first time. 5007 */ 5008 __attribute__((noreturn, noinline)) 5009 static void 5010 workq_bound_thread_initialize_and_unpark_continue(void *parameter __unused, 5011 wait_result_t wr) 5012 { 5013 /* 5014 * Locking model for accessing uu_workq_flags : 5015 * 5016 * The concurrent access to uu_workq_flags is synchronized with workq lock 5017 * until a thread gets permanently bound to a kqwl. Post that, kqlock 5018 * is used for subsequent synchronizations. This gives us a significant 5019 * benefit by avoiding having to take a process wide workq lock on every 5020 * wakeup of the bound thread. 5021 * This flip in locking model is tracked with UT_WORKQ_PERMANENT_BIND flag. 5022 * 5023 * There is one more optimization we can perform for when the thread is 5024 * awakened for running (i.e THREAD_AWAKENED) until it parks. 5025 * During this window, we know KQ_SLEEP bit is reset so there should not 5026 * be any concurrent attempts to modify uu_workq_flags by 5027 * kqworkloop_bound_thread_wakeup because the thread is already "awake". 5028 * So we can safely access uu_workq_flags within this window without having 5029 * to take kqlock. This KQ_SLEEP is later set by the bound thread under 5030 * kqlock on its way to parking. 5031 */ 5032 struct uthread *uth = get_bsdthread_info(current_thread()); 5033 5034 if (__probable(wr == THREAD_AWAKENED)) { 5035 /* At most one flag. */ 5036 assert((uth->uu_workq_flags & (UT_WORKQ_RUNNING | UT_WORKQ_DYING)) 5037 != (UT_WORKQ_RUNNING | UT_WORKQ_DYING)); 5038 5039 assert(workq_thread_is_permanently_bound(uth)); 5040 5041 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) { 5042 assert(uth->uu_workq_flags & UT_WORKQ_NEW); 5043 uth->uu_workq_flags &= ~UT_WORKQ_NEW; 5044 5045 struct workq_threadreq_s * kqr = uth->uu_kqr_bound; 5046 if (kqr->tr_work_interval) { 5047 kern_return_t kr; 5048 kr = kern_work_interval_explicit_join(get_machthread(uth), 5049 kqr->tr_work_interval); 5050 /* 5051 * The work interval functions requires to be called on the 5052 * current thread. If we fail here, we record the fact and 5053 * continue. 5054 * In the future, we can preflight checking that this join will 5055 * always be successful when the paird kqwl is configured; but, 5056 * for now, this should be a rare case (e.g. if you have passed 5057 * invalid arguments to the join). 5058 */ 5059 if (kr == KERN_SUCCESS) { 5060 uth->uu_workq_flags |= UT_WORKQ_WORK_INTERVAL_JOINED; 5061 /* Thread and kqwl both have +1 ref on the work interval. */ 5062 } else { 5063 uth->uu_workq_flags |= UT_WORKQ_WORK_INTERVAL_FAILED; 5064 } 5065 } 5066 workq_thread_reset_cpupercent(kqr, uth); 5067 workq_bound_thread_setup_and_run(uth, WQ_SETUP_FIRST_USE); 5068 __builtin_unreachable(); 5069 } else { 5070 /* 5071 * The permanently bound kqworkloop is getting destroyed so we 5072 * are woken up to cleanly unbind ourselves from it and terminate. 5073 * See KQ_WORKLOOP_DESTROY -> workq_kern_bound_thread_wakeup. 5074 * 5075 * The actual full unbind happens from 5076 * uthread_cleanup -> kqueue_threadreq_unbind. 5077 */ 5078 assert(uth->uu_workq_flags & UT_WORKQ_DYING); 5079 } 5080 } else { 5081 /* 5082 * The process is getting terminated so we are woken up to die. 5083 * E.g. SIGKILL'd. 5084 */ 5085 assert(wr == THREAD_INTERRUPTED); 5086 /* 5087 * It is possible we started running as the process is aborted 5088 * due to termination; but, workq_kern_threadreq_permanent_bind 5089 * has not had a chance to bind us to the kqwl yet. 5090 * 5091 * We synchronize with it using workq lock. 5092 */ 5093 proc_t p = current_proc(); 5094 struct workqueue *wq = proc_get_wqptr_fast(p); 5095 workq_lock_spin(wq); 5096 assert(workq_thread_is_permanently_bound(uth)); 5097 workq_unlock(wq); 5098 5099 /* 5100 * We do the bind commit ourselves if workq_kern_threadreq_permanent_bind 5101 * has not done it for us yet so our state is aligned with what the 5102 * termination path below expects. 5103 */ 5104 kqueue_threadreq_bind_commit(p, get_machthread(uth)); 5105 } 5106 workq_kern_bound_thread_terminate(uth->uu_kqr_bound); 5107 __builtin_unreachable(); 5108 } 5109 5110 /** 5111 * A parked bound thread wakes up. Not the first time. 5112 */ 5113 __attribute__((noreturn, noinline)) 5114 static void 5115 workq_bound_thread_unpark_continue(void *parameter __unused, wait_result_t wr) 5116 { 5117 struct uthread *uth = get_bsdthread_info(current_thread()); 5118 assert(workq_thread_is_permanently_bound(uth)); 5119 5120 if (__probable(wr == THREAD_AWAKENED)) { 5121 /* At most one flag. */ 5122 assert((uth->uu_workq_flags & (UT_WORKQ_RUNNING | UT_WORKQ_DYING)) 5123 != (UT_WORKQ_RUNNING | UT_WORKQ_DYING)); 5124 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) { 5125 workq_bound_thread_setup_and_run(uth, WQ_SETUP_NONE); 5126 } else { 5127 assert(uth->uu_workq_flags & UT_WORKQ_DYING); 5128 } 5129 } else { 5130 assert(wr == THREAD_INTERRUPTED); 5131 } 5132 workq_kern_bound_thread_terminate(uth->uu_kqr_bound); 5133 __builtin_unreachable(); 5134 } 5135 5136 #pragma mark misc 5137 5138 int 5139 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo) 5140 { 5141 struct workqueue *wq = proc_get_wqptr(p); 5142 int error = 0; 5143 int activecount; 5144 5145 if (wq == NULL) { 5146 return EINVAL; 5147 } 5148 5149 /* 5150 * This is sometimes called from interrupt context by the kperf sampler. 5151 * In that case, it's not safe to spin trying to take the lock since we 5152 * might already hold it. So, we just try-lock it and error out if it's 5153 * already held. Since this is just a debugging aid, and all our callers 5154 * are able to handle an error, that's fine. 5155 */ 5156 bool locked = workq_lock_try(wq); 5157 if (!locked) { 5158 return EBUSY; 5159 } 5160 5161 wq_thactive_t act = _wq_thactive(wq); 5162 activecount = _wq_thactive_aggregate_downto_qos(wq, act, 5163 WORKQ_THREAD_QOS_MIN, NULL, NULL); 5164 if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) { 5165 activecount++; 5166 } 5167 pwqinfo->pwq_nthreads = wq->wq_nthreads; 5168 pwqinfo->pwq_runthreads = activecount; 5169 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount; 5170 pwqinfo->pwq_state = 0; 5171 5172 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) { 5173 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT; 5174 } 5175 5176 if (wq->wq_nthreads >= wq_max_threads) { 5177 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT; 5178 } 5179 5180 uint64_t total_cooperative_threads; 5181 total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq); 5182 if ((total_cooperative_threads == wq_cooperative_queue_max_size(wq)) && 5183 workq_has_cooperative_thread_requests(wq)) { 5184 pwqinfo->pwq_state |= WQ_EXCEEDED_COOPERATIVE_THREAD_LIMIT; 5185 } 5186 5187 if (wq->wq_exceeded_active_constrained_thread_limit) { 5188 pwqinfo->pwq_state |= WQ_EXCEEDED_ACTIVE_CONSTRAINED_THREAD_LIMIT; 5189 } 5190 5191 workq_unlock(wq); 5192 return error; 5193 } 5194 5195 boolean_t 5196 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total, 5197 boolean_t *exceeded_constrained) 5198 { 5199 proc_t p = v; 5200 struct proc_workqueueinfo pwqinfo; 5201 int err; 5202 5203 assert(p != NULL); 5204 assert(exceeded_total != NULL); 5205 assert(exceeded_constrained != NULL); 5206 5207 err = fill_procworkqueue(p, &pwqinfo); 5208 if (err) { 5209 return FALSE; 5210 } 5211 if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) { 5212 return FALSE; 5213 } 5214 5215 *exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT); 5216 *exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT); 5217 5218 return TRUE; 5219 } 5220 5221 uint64_t 5222 workqueue_get_task_ss_flags_from_pwq_state_kdp(void * v) 5223 { 5224 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) == 5225 kTaskWqExceededConstrainedThreadLimit); 5226 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) == 5227 kTaskWqExceededTotalThreadLimit); 5228 static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable); 5229 static_assert(((uint64_t)WQ_EXCEEDED_COOPERATIVE_THREAD_LIMIT << 34) == 5230 (uint64_t)kTaskWqExceededCooperativeThreadLimit); 5231 static_assert(((uint64_t)WQ_EXCEEDED_ACTIVE_CONSTRAINED_THREAD_LIMIT << 34) == 5232 (uint64_t)kTaskWqExceededActiveConstrainedThreadLimit); 5233 static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT | 5234 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT | 5235 WQ_EXCEEDED_COOPERATIVE_THREAD_LIMIT | 5236 WQ_EXCEEDED_ACTIVE_CONSTRAINED_THREAD_LIMIT) == 0x1F); 5237 5238 if (v == NULL) { 5239 return 0; 5240 } 5241 5242 proc_t p = v; 5243 struct workqueue *wq = proc_get_wqptr(p); 5244 5245 if (wq == NULL || workq_lock_is_acquired_kdp(wq)) { 5246 return 0; 5247 } 5248 5249 uint64_t ss_flags = kTaskWqFlagsAvailable; 5250 5251 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) { 5252 ss_flags |= kTaskWqExceededConstrainedThreadLimit; 5253 } 5254 5255 if (wq->wq_nthreads >= wq_max_threads) { 5256 ss_flags |= kTaskWqExceededTotalThreadLimit; 5257 } 5258 5259 uint64_t total_cooperative_threads; 5260 total_cooperative_threads = workq_num_cooperative_threads_scheduled_to_qos_internal(wq, 5261 WORKQ_THREAD_QOS_MIN); 5262 if ((total_cooperative_threads == wq_cooperative_queue_max_size(wq)) && 5263 workq_has_cooperative_thread_requests(wq)) { 5264 ss_flags |= kTaskWqExceededCooperativeThreadLimit; 5265 } 5266 5267 if (wq->wq_exceeded_active_constrained_thread_limit) { 5268 ss_flags |= kTaskWqExceededActiveConstrainedThreadLimit; 5269 } 5270 5271 return ss_flags; 5272 } 5273 5274 void 5275 workq_init(void) 5276 { 5277 clock_interval_to_absolutetime_interval(wq_stalled_window.usecs, 5278 NSEC_PER_USEC, &wq_stalled_window.abstime); 5279 clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs, 5280 NSEC_PER_USEC, &wq_reduce_pool_window.abstime); 5281 clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs, 5282 NSEC_PER_USEC, &wq_max_timer_interval.abstime); 5283 5284 thread_deallocate_daemon_register_queue(&workq_deallocate_queue, 5285 workq_deallocate_queue_invoke); 5286 } 5287