1 /* 2 * Copyright (c) 2000-2020 Apple Inc. All rights reserved. 3 * 4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ 5 * 6 * This file contains Original Code and/or Modifications of Original Code 7 * as defined in and that are subject to the Apple Public Source License 8 * Version 2.0 (the 'License'). You may not use this file except in 9 * compliance with the License. The rights granted to you under the License 10 * may not be used to create, or enable the creation or redistribution of, 11 * unlawful or unlicensed copies of an Apple operating system, or to 12 * circumvent, violate, or enable the circumvention or violation of, any 13 * terms of an Apple operating system software license agreement. 14 * 15 * Please obtain a copy of the License at 16 * http://www.opensource.apple.com/apsl/ and read it before using this file. 17 * 18 * The Original Code and all software distributed under the License are 19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER 20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, 21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. 23 * Please see the License for the specific language governing rights and 24 * limitations under the License. 25 * 26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ 27 */ 28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */ 29 30 #include <sys/cdefs.h> 31 32 #include <kern/assert.h> 33 #include <kern/ast.h> 34 #include <kern/clock.h> 35 #include <kern/cpu_data.h> 36 #include <kern/kern_types.h> 37 #include <kern/policy_internal.h> 38 #include <kern/processor.h> 39 #include <kern/sched_prim.h> /* for thread_exception_return */ 40 #include <kern/task.h> 41 #include <kern/thread.h> 42 #include <kern/thread_group.h> 43 #include <kern/zalloc.h> 44 #include <mach/kern_return.h> 45 #include <mach/mach_param.h> 46 #include <mach/mach_port.h> 47 #include <mach/mach_types.h> 48 #include <mach/mach_vm.h> 49 #include <mach/sync_policy.h> 50 #include <mach/task.h> 51 #include <mach/thread_act.h> /* for thread_resume */ 52 #include <mach/thread_policy.h> 53 #include <mach/thread_status.h> 54 #include <mach/vm_prot.h> 55 #include <mach/vm_statistics.h> 56 #include <machine/atomic.h> 57 #include <machine/machine_routines.h> 58 #include <machine/smp.h> 59 #include <vm/vm_map.h> 60 #include <vm/vm_protos.h> 61 62 #include <sys/eventvar.h> 63 #include <sys/kdebug.h> 64 #include <sys/kernel.h> 65 #include <sys/lock.h> 66 #include <sys/param.h> 67 #include <sys/proc_info.h> /* for fill_procworkqueue */ 68 #include <sys/proc_internal.h> 69 #include <sys/pthread_shims.h> 70 #include <sys/resourcevar.h> 71 #include <sys/signalvar.h> 72 #include <sys/sysctl.h> 73 #include <sys/sysproto.h> 74 #include <sys/systm.h> 75 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */ 76 77 #include <pthread/bsdthread_private.h> 78 #include <pthread/workqueue_syscalls.h> 79 #include <pthread/workqueue_internal.h> 80 #include <pthread/workqueue_trace.h> 81 82 #include <os/log.h> 83 84 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2; 85 static void workq_schedule_creator(proc_t p, struct workqueue *wq, 86 workq_kern_threadreq_flags_t flags); 87 88 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth, 89 workq_threadreq_t req); 90 91 static uint32_t workq_constrained_allowance(struct workqueue *wq, 92 thread_qos_t at_qos, struct uthread *uth, bool may_start_timer); 93 94 static bool _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq); 95 96 static bool workq_thread_is_busy(uint64_t cur_ts, 97 _Atomic uint64_t *lastblocked_tsp); 98 99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS; 100 101 static bool 102 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags); 103 104 static inline void 105 workq_lock_spin(struct workqueue *wq); 106 107 static inline void 108 workq_unlock(struct workqueue *wq); 109 110 #pragma mark globals 111 112 struct workq_usec_var { 113 uint32_t usecs; 114 uint64_t abstime; 115 }; 116 117 #define WORKQ_SYSCTL_USECS(var, init) \ 118 static struct workq_usec_var var = { .usecs = init }; \ 119 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \ 120 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \ 121 workq_sysctl_handle_usecs, "I", "") 122 123 static LCK_GRP_DECLARE(workq_lck_grp, "workq"); 124 os_refgrp_decl(static, workq_refgrp, "workq", NULL); 125 126 static ZONE_DEFINE(workq_zone_workqueue, "workq.wq", 127 sizeof(struct workqueue), ZC_NONE); 128 static ZONE_DEFINE(workq_zone_threadreq, "workq.threadreq", 129 sizeof(struct workq_threadreq_s), ZC_CACHING); 130 131 static struct mpsc_daemon_queue workq_deallocate_queue; 132 133 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS); 134 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS); 135 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS); 136 static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS; 137 static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8; 138 static uint32_t wq_init_constrained_limit = 1; 139 static uint16_t wq_death_max_load; 140 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS]; 141 142 /* 143 * This is not a hard limit but the max size we want to aim to hit across the 144 * entire cooperative pool. We can oversubscribe the pool due to non-cooperative 145 * workers and the max we will oversubscribe the pool by, is a total of 146 * wq_max_cooperative_threads * WORKQ_NUM_QOS_BUCKETS. 147 */ 148 static uint32_t wq_max_cooperative_threads; 149 150 static inline uint32_t 151 wq_cooperative_queue_max_size(struct workqueue *wq) 152 { 153 return wq->wq_cooperative_queue_has_limited_max_size ? 1 : wq_max_cooperative_threads; 154 } 155 156 #pragma mark sysctls 157 158 static int 159 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS 160 { 161 #pragma unused(arg2) 162 struct workq_usec_var *v = arg1; 163 int error = sysctl_handle_int(oidp, &v->usecs, 0, req); 164 if (error || !req->newptr) { 165 return error; 166 } 167 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC, 168 &v->abstime); 169 return 0; 170 } 171 172 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED, 173 &wq_max_threads, 0, ""); 174 175 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED, 176 &wq_max_constrained_threads, 0, ""); 177 178 static int 179 wq_limit_cooperative_threads_for_proc SYSCTL_HANDLER_ARGS 180 { 181 #pragma unused(arg1, arg2, oidp) 182 int input_pool_size = 0; 183 int changed; 184 int error = 0; 185 186 error = sysctl_io_number(req, 0, sizeof(int), &input_pool_size, &changed); 187 if (error || !changed) { 188 return error; 189 } 190 191 #define WQ_COOPERATIVE_POOL_SIZE_DEFAULT 0 192 #define WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS -1 193 /* Not available currently, but sysctl interface is designed to allow these 194 * extra parameters: 195 * WQ_COOPERATIVE_POOL_SIZE_STRICT : -2 (across all bucket) 196 * WQ_COOPERATIVE_POOL_SIZE_CUSTOM : [1, 512] 197 */ 198 199 if (input_pool_size != WQ_COOPERATIVE_POOL_SIZE_DEFAULT 200 && input_pool_size != WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS) { 201 error = EINVAL; 202 goto out; 203 } 204 205 proc_t p = req->p; 206 struct workqueue *wq = proc_get_wqptr(p); 207 208 if (wq != NULL) { 209 workq_lock_spin(wq); 210 if (wq->wq_reqcount > 0 || wq->wq_nthreads > 0) { 211 // Hackily enforce that the workqueue is still new (no requests or 212 // threads) 213 error = ENOTSUP; 214 } else { 215 wq->wq_cooperative_queue_has_limited_max_size = (input_pool_size == WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS); 216 } 217 workq_unlock(wq); 218 } else { 219 /* This process has no workqueue, calling this syctl makes no sense */ 220 return ENOTSUP; 221 } 222 223 out: 224 return error; 225 } 226 227 SYSCTL_PROC(_kern, OID_AUTO, wq_limit_cooperative_threads, 228 CTLFLAG_ANYBODY | CTLFLAG_MASKED | CTLFLAG_WR | CTLFLAG_LOCKED | CTLTYPE_INT, 0, 0, 229 wq_limit_cooperative_threads_for_proc, 230 "I", "Modify the max pool size of the cooperative pool"); 231 232 #pragma mark p_wqptr 233 234 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0) 235 236 static struct workqueue * 237 proc_get_wqptr_fast(struct proc *p) 238 { 239 return os_atomic_load(&p->p_wqptr, relaxed); 240 } 241 242 struct workqueue * 243 proc_get_wqptr(struct proc *p) 244 { 245 struct workqueue *wq = proc_get_wqptr_fast(p); 246 return wq == WQPTR_IS_INITING_VALUE ? NULL : wq; 247 } 248 249 static void 250 proc_set_wqptr(struct proc *p, struct workqueue *wq) 251 { 252 wq = os_atomic_xchg(&p->p_wqptr, wq, release); 253 if (wq == WQPTR_IS_INITING_VALUE) { 254 proc_lock(p); 255 thread_wakeup(&p->p_wqptr); 256 proc_unlock(p); 257 } 258 } 259 260 static bool 261 proc_init_wqptr_or_wait(struct proc *p) 262 { 263 struct workqueue *wq; 264 265 proc_lock(p); 266 wq = os_atomic_load(&p->p_wqptr, relaxed); 267 268 if (wq == NULL) { 269 os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed); 270 proc_unlock(p); 271 return true; 272 } 273 274 if (wq == WQPTR_IS_INITING_VALUE) { 275 assert_wait(&p->p_wqptr, THREAD_UNINT); 276 proc_unlock(p); 277 thread_block(THREAD_CONTINUE_NULL); 278 } else { 279 proc_unlock(p); 280 } 281 return false; 282 } 283 284 static inline event_t 285 workq_parked_wait_event(struct uthread *uth) 286 { 287 return (event_t)&uth->uu_workq_stackaddr; 288 } 289 290 static inline void 291 workq_thread_wakeup(struct uthread *uth) 292 { 293 thread_wakeup_thread(workq_parked_wait_event(uth), get_machthread(uth)); 294 } 295 296 #pragma mark wq_thactive 297 298 #if defined(__LP64__) 299 // Layout is: 300 // 127 - 115 : 13 bits of zeroes 301 // 114 - 112 : best QoS among all pending constrained requests 302 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits 303 #define WQ_THACTIVE_BUCKET_WIDTH 16 304 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH) 305 #else 306 // Layout is: 307 // 63 - 61 : best QoS among all pending constrained requests 308 // 60 : Manager bucket (0 or 1) 309 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits 310 #define WQ_THACTIVE_BUCKET_WIDTH 10 311 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1) 312 #endif 313 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1) 314 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1)) 315 316 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3, 317 "Make sure we have space to encode a QoS"); 318 319 static inline wq_thactive_t 320 _wq_thactive(struct workqueue *wq) 321 { 322 return os_atomic_load_wide(&wq->wq_thactive, relaxed); 323 } 324 325 static inline uint8_t 326 _wq_bucket(thread_qos_t qos) 327 { 328 // Map both BG and MT to the same bucket by over-shifting down and 329 // clamping MT and BG together. 330 switch (qos) { 331 case THREAD_QOS_MAINTENANCE: 332 return 0; 333 default: 334 return qos - 2; 335 } 336 } 337 338 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \ 339 ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT)) 340 341 static inline thread_qos_t 342 _wq_thactive_best_constrained_req_qos(struct workqueue *wq) 343 { 344 // Avoid expensive atomic operations: the three bits we're loading are in 345 // a single byte, and always updated under the workqueue lock 346 wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive; 347 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v); 348 } 349 350 static void 351 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq) 352 { 353 thread_qos_t old_qos, new_qos; 354 workq_threadreq_t req; 355 356 req = priority_queue_max(&wq->wq_constrained_queue, 357 struct workq_threadreq_s, tr_entry); 358 new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED; 359 old_qos = _wq_thactive_best_constrained_req_qos(wq); 360 if (old_qos != new_qos) { 361 long delta = (long)new_qos - (long)old_qos; 362 wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT; 363 /* 364 * We can do an atomic add relative to the initial load because updates 365 * to this qos are always serialized under the workqueue lock. 366 */ 367 v = os_atomic_add(&wq->wq_thactive, v, relaxed); 368 #ifdef __LP64__ 369 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v, 370 (uint64_t)(v >> 64), 0); 371 #else 372 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0); 373 #endif 374 } 375 } 376 377 static inline wq_thactive_t 378 _wq_thactive_offset_for_qos(thread_qos_t qos) 379 { 380 uint8_t bucket = _wq_bucket(qos); 381 __builtin_assume(bucket < WORKQ_NUM_BUCKETS); 382 return (wq_thactive_t)1 << (bucket * WQ_THACTIVE_BUCKET_WIDTH); 383 } 384 385 static inline wq_thactive_t 386 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos) 387 { 388 wq_thactive_t v = _wq_thactive_offset_for_qos(qos); 389 return os_atomic_add_orig(&wq->wq_thactive, v, relaxed); 390 } 391 392 static inline wq_thactive_t 393 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos) 394 { 395 wq_thactive_t v = _wq_thactive_offset_for_qos(qos); 396 return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed); 397 } 398 399 static inline void 400 _wq_thactive_move(struct workqueue *wq, 401 thread_qos_t old_qos, thread_qos_t new_qos) 402 { 403 wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) - 404 _wq_thactive_offset_for_qos(old_qos); 405 os_atomic_add(&wq->wq_thactive, v, relaxed); 406 wq->wq_thscheduled_count[_wq_bucket(old_qos)]--; 407 wq->wq_thscheduled_count[_wq_bucket(new_qos)]++; 408 } 409 410 static inline uint32_t 411 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v, 412 thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount) 413 { 414 uint32_t count = 0, active; 415 uint64_t curtime; 416 417 assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX); 418 419 if (busycount) { 420 curtime = mach_absolute_time(); 421 *busycount = 0; 422 } 423 if (max_busycount) { 424 *max_busycount = THREAD_QOS_LAST - qos; 425 } 426 427 uint8_t i = _wq_bucket(qos); 428 v >>= i * WQ_THACTIVE_BUCKET_WIDTH; 429 for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) { 430 active = v & WQ_THACTIVE_BUCKET_MASK; 431 count += active; 432 433 if (busycount && wq->wq_thscheduled_count[i] > active) { 434 if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) { 435 /* 436 * We only consider the last blocked thread for a given bucket 437 * as busy because we don't want to take the list lock in each 438 * sched callback. However this is an approximation that could 439 * contribute to thread creation storms. 440 */ 441 (*busycount)++; 442 } 443 } 444 } 445 446 return count; 447 } 448 449 /* The input qos here should be the requested QoS of the thread, not accounting 450 * for any overrides */ 451 static inline void 452 _wq_cooperative_queue_scheduled_count_dec(struct workqueue *wq, thread_qos_t qos) 453 { 454 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]--; 455 assert(old_scheduled_count > 0); 456 } 457 458 /* The input qos here should be the requested QoS of the thread, not accounting 459 * for any overrides */ 460 static inline void 461 _wq_cooperative_queue_scheduled_count_inc(struct workqueue *wq, thread_qos_t qos) 462 { 463 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]++; 464 assert(old_scheduled_count < UINT8_MAX); 465 } 466 467 #pragma mark wq_flags 468 469 static inline uint32_t 470 _wq_flags(struct workqueue *wq) 471 { 472 return os_atomic_load(&wq->wq_flags, relaxed); 473 } 474 475 static inline bool 476 _wq_exiting(struct workqueue *wq) 477 { 478 return _wq_flags(wq) & WQ_EXITING; 479 } 480 481 bool 482 workq_is_exiting(struct proc *p) 483 { 484 struct workqueue *wq = proc_get_wqptr(p); 485 return !wq || _wq_exiting(wq); 486 } 487 488 489 #pragma mark workqueue lock 490 491 static bool 492 workq_lock_is_acquired_kdp(struct workqueue *wq) 493 { 494 return kdp_lck_ticket_is_acquired(&wq->wq_lock); 495 } 496 497 static inline void 498 workq_lock_spin(struct workqueue *wq) 499 { 500 lck_ticket_lock(&wq->wq_lock, &workq_lck_grp); 501 } 502 503 static inline void 504 workq_lock_held(struct workqueue *wq) 505 { 506 LCK_TICKET_ASSERT_OWNED(&wq->wq_lock); 507 } 508 509 static inline bool 510 workq_lock_try(struct workqueue *wq) 511 { 512 return lck_ticket_lock_try(&wq->wq_lock, &workq_lck_grp); 513 } 514 515 static inline void 516 workq_unlock(struct workqueue *wq) 517 { 518 lck_ticket_unlock(&wq->wq_lock); 519 } 520 521 #pragma mark idle thread lists 522 523 #define WORKQ_POLICY_INIT(qos) \ 524 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos } 525 526 static inline thread_qos_t 527 workq_pri_bucket(struct uu_workq_policy req) 528 { 529 return MAX(MAX(req.qos_req, req.qos_max), req.qos_override); 530 } 531 532 static inline thread_qos_t 533 workq_pri_override(struct uu_workq_policy req) 534 { 535 return MAX(workq_pri_bucket(req), req.qos_bucket); 536 } 537 538 static inline bool 539 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth) 540 { 541 workq_threadreq_param_t cur_trp, req_trp = { }; 542 543 cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params; 544 if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) { 545 req_trp = kqueue_threadreq_workloop_param(req); 546 } 547 548 /* 549 * CPU percent flags are handled separately to policy changes, so ignore 550 * them for all of these checks. 551 */ 552 uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT); 553 uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT); 554 555 if (!req_flags && !cur_flags) { 556 return false; 557 } 558 559 if (req_flags != cur_flags) { 560 return true; 561 } 562 563 if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) { 564 return true; 565 } 566 567 if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) { 568 return true; 569 } 570 571 return false; 572 } 573 574 static inline bool 575 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth) 576 { 577 if (workq_thread_needs_params_change(req, uth)) { 578 return true; 579 } 580 581 if (req->tr_qos != workq_pri_override(uth->uu_workq_pri)) { 582 return true; 583 } 584 585 #if CONFIG_PREADOPT_TG 586 thread_group_qos_t tg = kqr_preadopt_thread_group(req); 587 if (KQWL_HAS_VALID_PREADOPTED_TG(tg)) { 588 /* 589 * Ideally, we'd add check here to see if thread's preadopt TG is same 590 * as the thread requests's thread group and short circuit if that is 591 * the case. But in the interest of keeping the code clean and not 592 * taking the thread lock here, we're going to skip this. We will 593 * eventually shortcircuit once we try to set the preadoption thread 594 * group on the thread. 595 */ 596 return true; 597 } 598 #endif 599 600 return false; 601 } 602 603 /* Input thread must be self. Called during self override, resetting overrides 604 * or while processing kevents 605 * 606 * Called with workq lock held. Sometimes also the thread mutex 607 */ 608 static void 609 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth, 610 struct uu_workq_policy old_pri, struct uu_workq_policy new_pri, 611 bool force_run) 612 { 613 assert(uth == current_uthread()); 614 615 thread_qos_t old_bucket = old_pri.qos_bucket; 616 thread_qos_t new_bucket = workq_pri_bucket(new_pri); 617 618 if (old_bucket != new_bucket) { 619 _wq_thactive_move(wq, old_bucket, new_bucket); 620 } 621 622 new_pri.qos_bucket = new_bucket; 623 uth->uu_workq_pri = new_pri; 624 625 if (old_pri.qos_override != new_pri.qos_override) { 626 thread_set_workq_override(get_machthread(uth), new_pri.qos_override); 627 } 628 629 if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) { 630 int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS; 631 if (old_bucket > new_bucket) { 632 /* 633 * When lowering our bucket, we may unblock a thread request, 634 * but we can't drop our priority before we have evaluated 635 * whether this is the case, and if we ever drop the workqueue lock 636 * that would cause a priority inversion. 637 * 638 * We hence have to disallow thread creation in that case. 639 */ 640 flags = 0; 641 } 642 workq_schedule_creator(p, wq, flags); 643 } 644 } 645 646 /* 647 * Sets/resets the cpu percent limits on the current thread. We can't set 648 * these limits from outside of the current thread, so this function needs 649 * to be called when we're executing on the intended 650 */ 651 static void 652 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth) 653 { 654 assert(uth == current_uthread()); 655 workq_threadreq_param_t trp = { }; 656 657 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) { 658 trp = kqueue_threadreq_workloop_param(req); 659 } 660 661 if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) { 662 /* 663 * Going through disable when we have an existing CPU percent limit 664 * set will force the ledger to refill the token bucket of the current 665 * thread. Removing any penalty applied by previous thread use. 666 */ 667 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0); 668 uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT; 669 } 670 671 if (trp.trp_flags & TRP_CPUPERCENT) { 672 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent, 673 (uint64_t)trp.trp_refillms * NSEC_PER_SEC); 674 uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT; 675 } 676 } 677 678 /* Called with the workq lock held */ 679 static void 680 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth, 681 workq_threadreq_t req, bool unpark) 682 { 683 thread_t th = get_machthread(uth); 684 thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP; 685 workq_threadreq_param_t trp = { }; 686 int priority = 31; 687 int policy = POLICY_TIMESHARE; 688 689 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) { 690 trp = kqueue_threadreq_workloop_param(req); 691 } 692 693 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos); 694 uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS; 695 696 if (unpark) { 697 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value; 698 // qos sent out to userspace (may differ from uu_workq_pri on param threads) 699 uth->uu_save.uus_workq_park_data.qos = qos; 700 } 701 702 if (qos == WORKQ_THREAD_QOS_MANAGER) { 703 uint32_t mgr_pri = wq->wq_event_manager_priority; 704 assert(trp.trp_value == 0); // manager qos and thread policy don't mix 705 706 if (_pthread_priority_has_sched_pri(mgr_pri)) { 707 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 708 thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri, 709 POLICY_TIMESHARE); 710 return; 711 } 712 713 qos = _pthread_priority_thread_qos(mgr_pri); 714 } else { 715 if (trp.trp_flags & TRP_PRIORITY) { 716 qos = THREAD_QOS_UNSPECIFIED; 717 priority = trp.trp_pri; 718 uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS; 719 } 720 721 if (trp.trp_flags & TRP_POLICY) { 722 policy = trp.trp_pol; 723 } 724 } 725 726 #if CONFIG_PREADOPT_TG 727 if (req && (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP)) { 728 /* 729 * We cannot safely read and borrow the reference from the kqwl since it 730 * can disappear from under us at any time due to the max-ing logic in 731 * kqueue_set_preadopted_thread_group. 732 * 733 * As such, we do the following dance: 734 * 735 * 1) cmpxchng and steal the kqwl's preadopt thread group and leave 736 * behind with (NULL + QoS). At this point, we have the reference 737 * to the thread group from the kqwl. 738 * 2) Have the thread set the preadoption thread group on itself. 739 * 3) cmpxchng from (NULL + QoS) which we set earlier in (1), back to 740 * thread_group + QoS. ie we try to give the reference back to the kqwl. 741 * If we fail, that's because a higher QoS thread group was set on the 742 * kqwl in kqueue_set_preadopted_thread_group in which case, we need to 743 * go back to (1). 744 */ 745 746 _Atomic(struct thread_group *) * tg_loc = kqr_preadopt_thread_group_addr(req); 747 748 thread_group_qos_t old_tg, new_tg; 749 int ret = 0; 750 again: 751 ret = os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, { 752 if (!KQWL_HAS_VALID_PREADOPTED_TG(old_tg)) { 753 os_atomic_rmw_loop_give_up(break); 754 } 755 756 /* 757 * Leave the QoS behind - kqueue_set_preadopted_thread_group will 758 * only modify it if there is a higher QoS thread group to attach 759 */ 760 new_tg = (thread_group_qos_t) ((uintptr_t) old_tg & KQWL_PREADOPT_TG_QOS_MASK); 761 }); 762 763 if (ret) { 764 /* 765 * We successfully took the ref from the kqwl so set it on the 766 * thread now 767 */ 768 thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg)); 769 770 thread_group_qos_t thread_group_to_expect = new_tg; 771 thread_group_qos_t thread_group_to_set = old_tg; 772 773 os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, { 774 if (old_tg != thread_group_to_expect) { 775 /* 776 * There was an intervening write to the kqwl_preadopt_tg, 777 * and it has a higher QoS than what we are working with 778 * here. Abandon our current adopted thread group and redo 779 * the full dance 780 */ 781 thread_group_deallocate_safe(KQWL_GET_PREADOPTED_TG(thread_group_to_set)); 782 os_atomic_rmw_loop_give_up(goto again); 783 } 784 785 new_tg = thread_group_to_set; 786 }); 787 } else { 788 /* Nothing valid on the kqwl, just clear what's on the thread */ 789 thread_set_preadopt_thread_group(th, NULL); 790 } 791 } else { 792 /* Not even a kqwl, clear what's on the thread */ 793 thread_set_preadopt_thread_group(th, NULL); 794 } 795 #endif 796 thread_set_workq_pri(th, qos, priority, policy); 797 } 798 799 /* 800 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held, 801 * every time a servicer is being told about a new max QoS. 802 */ 803 void 804 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr) 805 { 806 struct uu_workq_policy old_pri, new_pri; 807 struct uthread *uth = current_uthread(); 808 struct workqueue *wq = proc_get_wqptr_fast(p); 809 thread_qos_t qos = kqr->tr_kq_qos_index; 810 811 if (uth->uu_workq_pri.qos_max == qos) { 812 return; 813 } 814 815 workq_lock_spin(wq); 816 old_pri = new_pri = uth->uu_workq_pri; 817 new_pri.qos_max = qos; 818 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 819 workq_unlock(wq); 820 } 821 822 #pragma mark idle threads accounting and handling 823 824 static inline struct uthread * 825 workq_oldest_killable_idle_thread(struct workqueue *wq) 826 { 827 struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head); 828 829 if (uth && !uth->uu_save.uus_workq_park_data.has_stack) { 830 uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry); 831 if (uth) { 832 assert(uth->uu_save.uus_workq_park_data.has_stack); 833 } 834 } 835 return uth; 836 } 837 838 static inline uint64_t 839 workq_kill_delay_for_idle_thread(struct workqueue *wq) 840 { 841 uint64_t delay = wq_reduce_pool_window.abstime; 842 uint16_t idle = wq->wq_thidlecount; 843 844 /* 845 * If we have less than wq_death_max_load threads, have a 5s timer. 846 * 847 * For the next wq_max_constrained_threads ones, decay linearly from 848 * from 5s to 50ms. 849 */ 850 if (idle <= wq_death_max_load) { 851 return delay; 852 } 853 854 if (wq_max_constrained_threads > idle - wq_death_max_load) { 855 delay *= (wq_max_constrained_threads - (idle - wq_death_max_load)); 856 } 857 return delay / wq_max_constrained_threads; 858 } 859 860 static inline bool 861 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth, 862 uint64_t now) 863 { 864 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 865 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay; 866 } 867 868 static void 869 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline) 870 { 871 uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed); 872 873 if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) { 874 return; 875 } 876 os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed); 877 878 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0); 879 880 /* 881 * <rdar://problem/13139182> Due to how long term timers work, the leeway 882 * can't be too short, so use 500ms which is long enough that we will not 883 * wake up the CPU for killing threads, but short enough that it doesn't 884 * fall into long-term timer list shenanigans. 885 */ 886 thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline, 887 wq_reduce_pool_window.abstime / 10, 888 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND); 889 } 890 891 /* 892 * `decrement` is set to the number of threads that are no longer dying: 893 * - because they have been resuscitated just in time (workq_pop_idle_thread) 894 * - or have been killed (workq_thread_terminate). 895 */ 896 static void 897 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement) 898 { 899 struct uthread *uth; 900 901 assert(wq->wq_thdying_count >= decrement); 902 if ((wq->wq_thdying_count -= decrement) > 0) { 903 return; 904 } 905 906 if (wq->wq_thidlecount <= 1) { 907 return; 908 } 909 910 if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) { 911 return; 912 } 913 914 uint64_t now = mach_absolute_time(); 915 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 916 917 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) { 918 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START, 919 wq, wq->wq_thidlecount, 0, 0); 920 wq->wq_thdying_count++; 921 uth->uu_workq_flags |= UT_WORKQ_DYING; 922 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) { 923 workq_thread_wakeup(uth); 924 } 925 return; 926 } 927 928 workq_death_call_schedule(wq, 929 uth->uu_save.uus_workq_park_data.idle_stamp + delay); 930 } 931 932 void 933 workq_thread_terminate(struct proc *p, struct uthread *uth) 934 { 935 struct workqueue *wq = proc_get_wqptr_fast(p); 936 937 workq_lock_spin(wq); 938 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry); 939 if (uth->uu_workq_flags & UT_WORKQ_DYING) { 940 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END, 941 wq, wq->wq_thidlecount, 0, 0); 942 workq_death_policy_evaluate(wq, 1); 943 } 944 if (wq->wq_nthreads-- == wq_max_threads) { 945 /* 946 * We got under the thread limit again, which may have prevented 947 * thread creation from happening, redrive if there are pending requests 948 */ 949 if (wq->wq_reqcount) { 950 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 951 } 952 } 953 workq_unlock(wq); 954 955 thread_deallocate(get_machthread(uth)); 956 } 957 958 static void 959 workq_kill_old_threads_call(void *param0, void *param1 __unused) 960 { 961 struct workqueue *wq = param0; 962 963 workq_lock_spin(wq); 964 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0); 965 os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed); 966 workq_death_policy_evaluate(wq, 0); 967 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0); 968 workq_unlock(wq); 969 } 970 971 static struct uthread * 972 workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags, 973 bool *needs_wakeup) 974 { 975 struct uthread *uth; 976 977 if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) { 978 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry); 979 } else { 980 uth = TAILQ_FIRST(&wq->wq_thnewlist); 981 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry); 982 } 983 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry); 984 985 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0); 986 uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags; 987 988 /* A thread is never woken up as part of the cooperative pool */ 989 assert((uu_flags & UT_WORKQ_COOPERATIVE) == 0); 990 991 if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) { 992 wq->wq_constrained_threads_scheduled++; 993 } 994 wq->wq_threads_scheduled++; 995 wq->wq_thidlecount--; 996 997 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) { 998 uth->uu_workq_flags ^= UT_WORKQ_DYING; 999 workq_death_policy_evaluate(wq, 1); 1000 *needs_wakeup = false; 1001 } else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) { 1002 *needs_wakeup = false; 1003 } else { 1004 *needs_wakeup = true; 1005 } 1006 return uth; 1007 } 1008 1009 /* 1010 * Called by thread_create_workq_waiting() during thread initialization, before 1011 * assert_wait, before the thread has been started. 1012 */ 1013 event_t 1014 workq_thread_init_and_wq_lock(task_t task, thread_t th) 1015 { 1016 struct uthread *uth = get_bsdthread_info(th); 1017 1018 uth->uu_workq_flags = UT_WORKQ_NEW; 1019 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY); 1020 uth->uu_workq_thport = MACH_PORT_NULL; 1021 uth->uu_workq_stackaddr = 0; 1022 uth->uu_workq_pthread_kill_allowed = 0; 1023 1024 thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE); 1025 thread_reset_workq_qos(th, THREAD_QOS_LEGACY); 1026 1027 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task))); 1028 return workq_parked_wait_event(uth); 1029 } 1030 1031 /** 1032 * Try to add a new workqueue thread. 1033 * 1034 * - called with workq lock held 1035 * - dropped and retaken around thread creation 1036 * - return with workq lock held 1037 */ 1038 static bool 1039 workq_add_new_idle_thread(proc_t p, struct workqueue *wq) 1040 { 1041 mach_vm_offset_t th_stackaddr; 1042 kern_return_t kret; 1043 thread_t th; 1044 1045 wq->wq_nthreads++; 1046 1047 workq_unlock(wq); 1048 1049 vm_map_t vmap = get_task_map(proc_task(p)); 1050 1051 kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr); 1052 if (kret != KERN_SUCCESS) { 1053 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, 1054 kret, 1, 0); 1055 goto out; 1056 } 1057 1058 kret = thread_create_workq_waiting(proc_task(p), workq_unpark_continue, &th); 1059 if (kret != KERN_SUCCESS) { 1060 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, 1061 kret, 0, 0); 1062 pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr); 1063 goto out; 1064 } 1065 1066 // thread_create_workq_waiting() will return with the wq lock held 1067 // on success, because it calls workq_thread_init_and_wq_lock() above 1068 1069 struct uthread *uth = get_bsdthread_info(th); 1070 1071 wq->wq_creations++; 1072 wq->wq_thidlecount++; 1073 uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr; 1074 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry); 1075 1076 WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0); 1077 return true; 1078 1079 out: 1080 workq_lock_spin(wq); 1081 /* 1082 * Do not redrive here if we went under wq_max_threads again, 1083 * it is the responsibility of the callers of this function 1084 * to do so when it fails. 1085 */ 1086 wq->wq_nthreads--; 1087 return false; 1088 } 1089 1090 static inline bool 1091 workq_thread_is_overcommit(struct uthread *uth) 1092 { 1093 return (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0; 1094 } 1095 1096 static inline bool 1097 workq_thread_is_nonovercommit(struct uthread *uth) 1098 { 1099 return (uth->uu_workq_flags & (UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE)) == 0; 1100 } 1101 1102 static inline bool 1103 workq_thread_is_cooperative(struct uthread *uth) 1104 { 1105 return (uth->uu_workq_flags & UT_WORKQ_COOPERATIVE) != 0; 1106 } 1107 1108 static inline void 1109 workq_thread_set_type(struct uthread *uth, uint16_t flags) 1110 { 1111 uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE); 1112 uth->uu_workq_flags |= flags; 1113 } 1114 1115 1116 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1 1117 1118 __attribute__((noreturn, noinline)) 1119 static void 1120 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq, 1121 struct uthread *uth, uint32_t death_flags, uint32_t setup_flags) 1122 { 1123 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri); 1124 bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW; 1125 1126 if (qos > WORKQ_THREAD_QOS_CLEANUP) { 1127 workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true); 1128 qos = WORKQ_THREAD_QOS_CLEANUP; 1129 } 1130 1131 workq_thread_reset_cpupercent(NULL, uth); 1132 1133 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) { 1134 wq->wq_thidlecount--; 1135 if (first_use) { 1136 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry); 1137 } else { 1138 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry); 1139 } 1140 } 1141 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry); 1142 1143 workq_unlock(wq); 1144 1145 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) { 1146 __assert_only kern_return_t kr; 1147 kr = thread_set_voucher_name(MACH_PORT_NULL); 1148 assert(kr == KERN_SUCCESS); 1149 } 1150 1151 uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS; 1152 thread_t th = get_machthread(uth); 1153 vm_map_t vmap = get_task_map(proc_task(p)); 1154 1155 if (!first_use) { 1156 flags |= WQ_FLAG_THREAD_REUSE; 1157 } 1158 1159 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr, 1160 uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags); 1161 __builtin_unreachable(); 1162 } 1163 1164 bool 1165 workq_is_current_thread_updating_turnstile(struct workqueue *wq) 1166 { 1167 return wq->wq_turnstile_updater == current_thread(); 1168 } 1169 1170 __attribute__((always_inline)) 1171 static inline void 1172 workq_perform_turnstile_operation_locked(struct workqueue *wq, 1173 void (^operation)(void)) 1174 { 1175 workq_lock_held(wq); 1176 wq->wq_turnstile_updater = current_thread(); 1177 operation(); 1178 wq->wq_turnstile_updater = THREAD_NULL; 1179 } 1180 1181 static void 1182 workq_turnstile_update_inheritor(struct workqueue *wq, 1183 turnstile_inheritor_t inheritor, 1184 turnstile_update_flags_t flags) 1185 { 1186 if (wq->wq_inheritor == inheritor) { 1187 return; 1188 } 1189 wq->wq_inheritor = inheritor; 1190 workq_perform_turnstile_operation_locked(wq, ^{ 1191 turnstile_update_inheritor(wq->wq_turnstile, inheritor, 1192 flags | TURNSTILE_IMMEDIATE_UPDATE); 1193 turnstile_update_inheritor_complete(wq->wq_turnstile, 1194 TURNSTILE_INTERLOCK_HELD); 1195 }); 1196 } 1197 1198 static void 1199 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth, 1200 uint32_t setup_flags) 1201 { 1202 uint64_t now = mach_absolute_time(); 1203 bool is_creator = (uth == wq->wq_creator); 1204 1205 if (workq_thread_is_cooperative(uth)) { 1206 assert(!is_creator); 1207 1208 thread_qos_t thread_qos = uth->uu_workq_pri.qos_req; 1209 _wq_cooperative_queue_scheduled_count_dec(wq, thread_qos); 1210 1211 /* Before we get here, we always go through 1212 * workq_select_threadreq_or_park_and_unlock. If we got here, it means 1213 * that we went through the logic in workq_threadreq_select which 1214 * did the refresh for the next best cooperative qos while 1215 * excluding the current thread - we shouldn't need to do it again. 1216 */ 1217 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false); 1218 } else if (workq_thread_is_nonovercommit(uth)) { 1219 assert(!is_creator); 1220 1221 wq->wq_constrained_threads_scheduled--; 1222 } 1223 1224 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE); 1225 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry); 1226 wq->wq_threads_scheduled--; 1227 1228 if (is_creator) { 1229 wq->wq_creator = NULL; 1230 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0, 1231 uth->uu_save.uus_workq_park_data.yields); 1232 } 1233 1234 if (wq->wq_inheritor == get_machthread(uth)) { 1235 assert(wq->wq_creator == NULL); 1236 if (wq->wq_reqcount) { 1237 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ); 1238 } else { 1239 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 1240 } 1241 } 1242 1243 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 1244 assert(is_creator || (_wq_flags(wq) & WQ_EXITING)); 1245 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry); 1246 wq->wq_thidlecount++; 1247 return; 1248 } 1249 1250 if (!is_creator) { 1251 _wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket); 1252 wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--; 1253 uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP; 1254 } 1255 1256 uth->uu_save.uus_workq_park_data.idle_stamp = now; 1257 1258 struct uthread *oldest = workq_oldest_killable_idle_thread(wq); 1259 uint16_t cur_idle = wq->wq_thidlecount; 1260 1261 if (cur_idle >= wq_max_constrained_threads || 1262 (wq->wq_thdying_count == 0 && oldest && 1263 workq_should_kill_idle_thread(wq, oldest, now))) { 1264 /* 1265 * Immediately kill threads if we have too may of them. 1266 * 1267 * And swap "place" with the oldest one we'd have woken up. 1268 * This is a relatively desperate situation where we really 1269 * need to kill threads quickly and it's best to kill 1270 * the one that's currently on core than context switching. 1271 */ 1272 if (oldest) { 1273 oldest->uu_save.uus_workq_park_data.idle_stamp = now; 1274 TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry); 1275 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry); 1276 } 1277 1278 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START, 1279 wq, cur_idle, 0, 0); 1280 wq->wq_thdying_count++; 1281 uth->uu_workq_flags |= UT_WORKQ_DYING; 1282 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP; 1283 workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags); 1284 __builtin_unreachable(); 1285 } 1286 1287 struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head); 1288 1289 cur_idle += 1; 1290 wq->wq_thidlecount = cur_idle; 1291 1292 if (cur_idle >= wq_death_max_load && tail && 1293 tail->uu_save.uus_workq_park_data.has_stack) { 1294 uth->uu_save.uus_workq_park_data.has_stack = false; 1295 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry); 1296 } else { 1297 uth->uu_save.uus_workq_park_data.has_stack = true; 1298 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry); 1299 } 1300 1301 if (!tail) { 1302 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 1303 workq_death_call_schedule(wq, now + delay); 1304 } 1305 } 1306 1307 #pragma mark thread requests 1308 1309 static inline bool 1310 workq_tr_is_overcommit(workq_tr_flags_t tr_flags) 1311 { 1312 return (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) != 0; 1313 } 1314 1315 static inline bool 1316 workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags) 1317 { 1318 return (tr_flags & (WORKQ_TR_FLAG_OVERCOMMIT | WORKQ_TR_FLAG_COOPERATIVE)) == 0; 1319 } 1320 1321 static inline bool 1322 workq_tr_is_cooperative(workq_tr_flags_t tr_flags) 1323 { 1324 return (tr_flags & WORKQ_TR_FLAG_COOPERATIVE) != 0; 1325 } 1326 1327 #define workq_threadreq_is_overcommit(req) workq_tr_is_overcommit((req)->tr_flags) 1328 #define workq_threadreq_is_nonovercommit(req) workq_tr_is_nonovercommit((req)->tr_flags) 1329 #define workq_threadreq_is_cooperative(req) workq_tr_is_cooperative((req)->tr_flags) 1330 1331 static inline int 1332 workq_priority_for_req(workq_threadreq_t req) 1333 { 1334 thread_qos_t qos = req->tr_qos; 1335 1336 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 1337 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req); 1338 assert(trp.trp_flags & TRP_PRIORITY); 1339 return trp.trp_pri; 1340 } 1341 return thread_workq_pri_for_qos(qos); 1342 } 1343 1344 static inline struct priority_queue_sched_max * 1345 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req) 1346 { 1347 assert(!workq_tr_is_cooperative(req->tr_flags)); 1348 1349 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 1350 return &wq->wq_special_queue; 1351 } else if (workq_tr_is_overcommit(req->tr_flags)) { 1352 return &wq->wq_overcommit_queue; 1353 } else { 1354 return &wq->wq_constrained_queue; 1355 } 1356 } 1357 1358 1359 /* Calculates the number of threads scheduled >= the input QoS */ 1360 static uint64_t 1361 workq_num_cooperative_threads_scheduled_to_qos(struct workqueue *wq, thread_qos_t qos) 1362 { 1363 workq_lock_held(wq); 1364 1365 uint64_t num_cooperative_threads = 0; 1366 1367 for (thread_qos_t cur_qos = WORKQ_THREAD_QOS_MAX; cur_qos >= qos; cur_qos--) { 1368 uint8_t bucket = _wq_bucket(cur_qos); 1369 num_cooperative_threads += wq->wq_cooperative_queue_scheduled_count[bucket]; 1370 } 1371 1372 return num_cooperative_threads; 1373 } 1374 1375 static uint64_t 1376 workq_num_cooperative_threads_scheduled_total(struct workqueue *wq) 1377 { 1378 return workq_num_cooperative_threads_scheduled_to_qos(wq, WORKQ_THREAD_QOS_MIN); 1379 } 1380 1381 #if DEBUG || DEVELOPMENT 1382 static bool 1383 workq_has_cooperative_thread_requests(struct workqueue *wq) 1384 { 1385 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) { 1386 uint8_t bucket = _wq_bucket(qos); 1387 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) { 1388 return true; 1389 } 1390 } 1391 1392 return false; 1393 } 1394 #endif 1395 1396 /* 1397 * Determines the next QoS bucket we should service next in the cooperative 1398 * pool. This function will always return a QoS for cooperative pool as long as 1399 * there are requests to be serviced. 1400 * 1401 * Unlike the other thread pools, for the cooperative thread pool the schedule 1402 * counts for the various buckets in the pool affect the next best request for 1403 * it. 1404 * 1405 * This function is called in the following contexts: 1406 * 1407 * a) When determining the best thread QoS for cooperative bucket for the 1408 * creator/thread reuse 1409 * 1410 * b) Once (a) has happened and thread has bound to a thread request, figuring 1411 * out whether the next best request for this pool has changed so that creator 1412 * can be scheduled. 1413 * 1414 * Returns true if the cooperative queue's best qos changed from previous 1415 * value. 1416 */ 1417 static bool 1418 _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq) 1419 { 1420 workq_lock_held(wq); 1421 1422 thread_qos_t old_best_req_qos = wq->wq_cooperative_queue_best_req_qos; 1423 1424 /* We determine the next best cooperative thread request based on the 1425 * following: 1426 * 1427 * 1. Take the MAX of the following: 1428 * a) Highest qos with pending TRs such that number of scheduled 1429 * threads so far with >= qos is < wq_max_cooperative_threads 1430 * b) Highest qos bucket with pending TRs but no scheduled threads for that bucket 1431 * 1432 * 2. If the result of (1) is UN, then we pick the highest priority amongst 1433 * pending thread requests in the pool. 1434 * 1435 */ 1436 thread_qos_t highest_qos_with_no_scheduled = THREAD_QOS_UNSPECIFIED; 1437 thread_qos_t highest_qos_req_with_width = THREAD_QOS_UNSPECIFIED; 1438 1439 thread_qos_t highest_qos_req = THREAD_QOS_UNSPECIFIED; 1440 1441 int scheduled_count_till_qos = 0; 1442 1443 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) { 1444 uint8_t bucket = _wq_bucket(qos); 1445 uint8_t scheduled_count_for_bucket = wq->wq_cooperative_queue_scheduled_count[bucket]; 1446 scheduled_count_till_qos += scheduled_count_for_bucket; 1447 1448 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) { 1449 if (qos > highest_qos_req) { 1450 highest_qos_req = qos; 1451 } 1452 /* 1453 * The pool isn't saturated for threads at and above this QoS, and 1454 * this qos bucket has pending requests 1455 */ 1456 if (scheduled_count_till_qos < wq_cooperative_queue_max_size(wq)) { 1457 if (qos > highest_qos_req_with_width) { 1458 highest_qos_req_with_width = qos; 1459 } 1460 } 1461 1462 /* 1463 * There are no threads scheduled for this bucket but there 1464 * is work pending, give it at least 1 thread 1465 */ 1466 if (scheduled_count_for_bucket == 0) { 1467 if (qos > highest_qos_with_no_scheduled) { 1468 highest_qos_with_no_scheduled = qos; 1469 } 1470 } 1471 } 1472 } 1473 1474 wq->wq_cooperative_queue_best_req_qos = MAX(highest_qos_with_no_scheduled, highest_qos_req_with_width); 1475 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) { 1476 wq->wq_cooperative_queue_best_req_qos = highest_qos_req; 1477 } 1478 1479 #if DEBUG || DEVELOPMENT 1480 /* Assert that if we are showing up the next best req as UN, then there 1481 * actually is no thread request in the cooperative pool buckets */ 1482 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) { 1483 assert(!workq_has_cooperative_thread_requests(wq)); 1484 } 1485 #endif 1486 1487 return old_best_req_qos != wq->wq_cooperative_queue_best_req_qos; 1488 } 1489 1490 /* 1491 * Returns whether or not the input thread (or creator thread if uth is NULL) 1492 * should be allowed to work as part of the cooperative pool for the <input qos> 1493 * bucket. 1494 * 1495 * This function is called in a bunch of places: 1496 * a) Quantum expires for a thread and it is part of the cooperative pool 1497 * b) When trying to pick a thread request for the creator thread to 1498 * represent. 1499 * c) When a thread is trying to pick a thread request to actually bind to 1500 * and service. 1501 * 1502 * Called with workq lock held. 1503 */ 1504 1505 #define WQ_COOPERATIVE_POOL_UNSATURATED 1 1506 #define WQ_COOPERATIVE_BUCKET_UNSERVICED 2 1507 #define WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS 3 1508 1509 static bool 1510 workq_cooperative_allowance(struct workqueue *wq, thread_qos_t qos, struct uthread *uth, 1511 bool may_start_timer) 1512 { 1513 workq_lock_held(wq); 1514 1515 bool exclude_thread_as_scheduled = false; 1516 bool passed_admissions = false; 1517 uint8_t bucket = _wq_bucket(qos); 1518 1519 if (uth && workq_thread_is_cooperative(uth)) { 1520 exclude_thread_as_scheduled = true; 1521 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req); 1522 } 1523 1524 /* 1525 * We have not saturated the pool yet, let this thread continue 1526 */ 1527 uint64_t total_cooperative_threads; 1528 total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq); 1529 if (total_cooperative_threads < wq_cooperative_queue_max_size(wq)) { 1530 passed_admissions = true; 1531 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, 1532 total_cooperative_threads, qos, passed_admissions, 1533 WQ_COOPERATIVE_POOL_UNSATURATED); 1534 goto out; 1535 } 1536 1537 /* 1538 * Without this thread, nothing is servicing the bucket which has pending 1539 * work 1540 */ 1541 uint64_t bucket_scheduled = wq->wq_cooperative_queue_scheduled_count[bucket]; 1542 if (bucket_scheduled == 0 && 1543 !STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) { 1544 passed_admissions = true; 1545 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, 1546 total_cooperative_threads, qos, passed_admissions, 1547 WQ_COOPERATIVE_BUCKET_UNSERVICED); 1548 goto out; 1549 } 1550 1551 /* 1552 * If number of threads at the QoS bucket >= input QoS exceeds the max we want 1553 * for the pool, deny this thread 1554 */ 1555 uint64_t aggregate_down_to_qos = workq_num_cooperative_threads_scheduled_to_qos(wq, qos); 1556 passed_admissions = (aggregate_down_to_qos < wq_cooperative_queue_max_size(wq)); 1557 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, aggregate_down_to_qos, 1558 qos, passed_admissions, WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS); 1559 1560 if (!passed_admissions && may_start_timer) { 1561 workq_schedule_delayed_thread_creation(wq, 0); 1562 } 1563 1564 out: 1565 if (exclude_thread_as_scheduled) { 1566 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req); 1567 } 1568 return passed_admissions; 1569 } 1570 1571 /* 1572 * returns true if the best request for the pool changed as a result of 1573 * enqueuing this thread request. 1574 */ 1575 static bool 1576 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req) 1577 { 1578 assert(req->tr_state == WORKQ_TR_STATE_NEW); 1579 1580 req->tr_state = WORKQ_TR_STATE_QUEUED; 1581 wq->wq_reqcount += req->tr_count; 1582 1583 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 1584 assert(wq->wq_event_manager_threadreq == NULL); 1585 assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT); 1586 assert(req->tr_count == 1); 1587 wq->wq_event_manager_threadreq = req; 1588 return true; 1589 } 1590 1591 if (workq_threadreq_is_cooperative(req)) { 1592 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 1593 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI); 1594 1595 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)]; 1596 STAILQ_INSERT_TAIL(bucket, req, tr_link); 1597 1598 return _wq_cooperative_queue_refresh_best_req_qos(wq); 1599 } 1600 1601 struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req); 1602 1603 priority_queue_entry_set_sched_pri(q, &req->tr_entry, 1604 workq_priority_for_req(req), false); 1605 1606 if (priority_queue_insert(q, &req->tr_entry)) { 1607 if (workq_threadreq_is_nonovercommit(req)) { 1608 _wq_thactive_refresh_best_constrained_req_qos(wq); 1609 } 1610 return true; 1611 } 1612 return false; 1613 } 1614 1615 /* 1616 * returns true if one of the following is true (so as to update creator if 1617 * needed): 1618 * 1619 * (a) the next highest request of the pool we dequeued the request from changed 1620 * (b) the next highest requests of the pool the current thread used to be a 1621 * part of, changed 1622 * 1623 * For overcommit, special and constrained pools, the next highest QoS for each 1624 * pool just a MAX of pending requests so tracking (a) is sufficient. 1625 * 1626 * But for cooperative thread pool, the next highest QoS for the pool depends on 1627 * schedule counts in the pool as well. So if the current thread used to be 1628 * cooperative in it's previous logical run ie (b), then that can also affect 1629 * cooperative pool's next best QoS requests. 1630 */ 1631 static bool 1632 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req, 1633 bool cooperative_sched_count_changed) 1634 { 1635 wq->wq_reqcount--; 1636 1637 bool next_highest_request_changed = false; 1638 1639 if (--req->tr_count == 0) { 1640 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 1641 assert(wq->wq_event_manager_threadreq == req); 1642 assert(req->tr_count == 0); 1643 wq->wq_event_manager_threadreq = NULL; 1644 1645 /* If a cooperative thread was the one which picked up the manager 1646 * thread request, we need to reevaluate the cooperative pool 1647 * anyways. 1648 */ 1649 if (cooperative_sched_count_changed) { 1650 _wq_cooperative_queue_refresh_best_req_qos(wq); 1651 } 1652 return true; 1653 } 1654 1655 if (workq_threadreq_is_cooperative(req)) { 1656 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 1657 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI); 1658 /* Account for the fact that BG and MT are coalesced when 1659 * calculating best request for cooperative pool 1660 */ 1661 assert(_wq_bucket(req->tr_qos) == _wq_bucket(wq->wq_cooperative_queue_best_req_qos)); 1662 1663 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)]; 1664 __assert_only workq_threadreq_t head = STAILQ_FIRST(bucket); 1665 1666 assert(head == req); 1667 STAILQ_REMOVE_HEAD(bucket, tr_link); 1668 1669 /* 1670 * If the request we're dequeueing is cooperative, then the sched 1671 * counts definitely changed. 1672 */ 1673 assert(cooperative_sched_count_changed); 1674 } 1675 1676 /* 1677 * We want to do the cooperative pool refresh after dequeueing a 1678 * cooperative thread request if any (to combine both effects into 1 1679 * refresh operation) 1680 */ 1681 if (cooperative_sched_count_changed) { 1682 next_highest_request_changed = _wq_cooperative_queue_refresh_best_req_qos(wq); 1683 } 1684 1685 if (!workq_threadreq_is_cooperative(req)) { 1686 /* 1687 * All other types of requests are enqueued in priority queues 1688 */ 1689 1690 if (priority_queue_remove(workq_priority_queue_for_req(wq, req), 1691 &req->tr_entry)) { 1692 next_highest_request_changed |= true; 1693 if (workq_threadreq_is_nonovercommit(req)) { 1694 _wq_thactive_refresh_best_constrained_req_qos(wq); 1695 } 1696 } 1697 } 1698 } 1699 1700 return next_highest_request_changed; 1701 } 1702 1703 static void 1704 workq_threadreq_destroy(proc_t p, workq_threadreq_t req) 1705 { 1706 req->tr_state = WORKQ_TR_STATE_CANCELED; 1707 if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) { 1708 kqueue_threadreq_cancel(p, req); 1709 } else { 1710 zfree(workq_zone_threadreq, req); 1711 } 1712 } 1713 1714 #pragma mark workqueue thread creation thread calls 1715 1716 static inline bool 1717 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend, 1718 uint32_t fail_mask) 1719 { 1720 uint32_t old_flags, new_flags; 1721 1722 os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, { 1723 if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) { 1724 os_atomic_rmw_loop_give_up(return false); 1725 } 1726 if (__improbable(old_flags & WQ_PROC_SUSPENDED)) { 1727 new_flags = old_flags | pend; 1728 } else { 1729 new_flags = old_flags | sched; 1730 } 1731 }); 1732 1733 return (old_flags & WQ_PROC_SUSPENDED) == 0; 1734 } 1735 1736 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1 1737 1738 static bool 1739 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags) 1740 { 1741 assert(!preemption_enabled()); 1742 1743 if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED, 1744 WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED | 1745 WQ_IMMEDIATE_CALL_SCHEDULED)) { 1746 return false; 1747 } 1748 1749 uint64_t now = mach_absolute_time(); 1750 1751 if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) { 1752 /* do not change the window */ 1753 } else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) { 1754 wq->wq_timer_interval *= 2; 1755 if (wq->wq_timer_interval > wq_max_timer_interval.abstime) { 1756 wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime; 1757 } 1758 } else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) { 1759 wq->wq_timer_interval /= 2; 1760 if (wq->wq_timer_interval < wq_stalled_window.abstime) { 1761 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime; 1762 } 1763 } 1764 1765 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, 1766 _wq_flags(wq), wq->wq_timer_interval); 1767 1768 thread_call_t call = wq->wq_delayed_call; 1769 uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED; 1770 uint64_t deadline = now + wq->wq_timer_interval; 1771 if (thread_call_enter1_delayed(call, (void *)arg, deadline)) { 1772 panic("delayed_call was already enqueued"); 1773 } 1774 return true; 1775 } 1776 1777 static void 1778 workq_schedule_immediate_thread_creation(struct workqueue *wq) 1779 { 1780 assert(!preemption_enabled()); 1781 1782 if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED, 1783 WQ_IMMEDIATE_CALL_PENDED, 0)) { 1784 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, 1785 _wq_flags(wq), 0); 1786 1787 uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED; 1788 if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) { 1789 panic("immediate_call was already enqueued"); 1790 } 1791 } 1792 } 1793 1794 void 1795 workq_proc_suspended(struct proc *p) 1796 { 1797 struct workqueue *wq = proc_get_wqptr(p); 1798 1799 if (wq) { 1800 os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed); 1801 } 1802 } 1803 1804 void 1805 workq_proc_resumed(struct proc *p) 1806 { 1807 struct workqueue *wq = proc_get_wqptr(p); 1808 uint32_t wq_flags; 1809 1810 if (!wq) { 1811 return; 1812 } 1813 1814 wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED | 1815 WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed); 1816 if ((wq_flags & WQ_EXITING) == 0) { 1817 disable_preemption(); 1818 if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) { 1819 workq_schedule_immediate_thread_creation(wq); 1820 } else if (wq_flags & WQ_DELAYED_CALL_PENDED) { 1821 workq_schedule_delayed_thread_creation(wq, 1822 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART); 1823 } 1824 enable_preemption(); 1825 } 1826 } 1827 1828 /** 1829 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now 1830 */ 1831 static bool 1832 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp) 1833 { 1834 uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed); 1835 if (now <= lastblocked_ts) { 1836 /* 1837 * Because the update of the timestamp when a thread blocks 1838 * isn't serialized against us looking at it (i.e. we don't hold 1839 * the workq lock), it's possible to have a timestamp that matches 1840 * the current time or that even looks to be in the future relative 1841 * to when we grabbed the current time... 1842 * 1843 * Just treat this as a busy thread since it must have just blocked. 1844 */ 1845 return true; 1846 } 1847 return (now - lastblocked_ts) < wq_stalled_window.abstime; 1848 } 1849 1850 static void 1851 workq_add_new_threads_call(void *_p, void *flags) 1852 { 1853 proc_t p = _p; 1854 struct workqueue *wq = proc_get_wqptr(p); 1855 uint32_t my_flag = (uint32_t)(uintptr_t)flags; 1856 1857 /* 1858 * workq_exit() will set the workqueue to NULL before 1859 * it cancels thread calls. 1860 */ 1861 if (!wq) { 1862 return; 1863 } 1864 1865 assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) || 1866 (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED)); 1867 1868 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq), 1869 wq->wq_nthreads, wq->wq_thidlecount); 1870 1871 workq_lock_spin(wq); 1872 1873 wq->wq_thread_call_last_run = mach_absolute_time(); 1874 os_atomic_andnot(&wq->wq_flags, my_flag, release); 1875 1876 /* This can drop the workqueue lock, and take it again */ 1877 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 1878 1879 workq_unlock(wq); 1880 1881 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0, 1882 wq->wq_nthreads, wq->wq_thidlecount); 1883 } 1884 1885 #pragma mark thread state tracking 1886 1887 static void 1888 workq_sched_callback(int type, thread_t thread) 1889 { 1890 thread_ro_t tro = get_thread_ro(thread); 1891 struct uthread *uth = get_bsdthread_info(thread); 1892 struct workqueue *wq = proc_get_wqptr(tro->tro_proc); 1893 thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket; 1894 wq_thactive_t old_thactive; 1895 bool start_timer = false; 1896 1897 if (qos == WORKQ_THREAD_QOS_MANAGER) { 1898 return; 1899 } 1900 1901 switch (type) { 1902 case SCHED_CALL_BLOCK: 1903 old_thactive = _wq_thactive_dec(wq, qos); 1904 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive); 1905 1906 /* 1907 * Remember the timestamp of the last thread that blocked in this 1908 * bucket, it used used by admission checks to ignore one thread 1909 * being inactive if this timestamp is recent enough. 1910 * 1911 * If we collide with another thread trying to update the 1912 * last_blocked (really unlikely since another thread would have to 1913 * get scheduled and then block after we start down this path), it's 1914 * not a problem. Either timestamp is adequate, so no need to retry 1915 */ 1916 os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)], 1917 thread_last_run_time(thread), relaxed); 1918 1919 if (req_qos == THREAD_QOS_UNSPECIFIED) { 1920 /* 1921 * No pending request at the moment we could unblock, move on. 1922 */ 1923 } else if (qos < req_qos) { 1924 /* 1925 * The blocking thread is at a lower QoS than the highest currently 1926 * pending constrained request, nothing has to be redriven 1927 */ 1928 } else { 1929 uint32_t max_busycount, old_req_count; 1930 old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive, 1931 req_qos, NULL, &max_busycount); 1932 /* 1933 * If it is possible that may_start_constrained_thread had refused 1934 * admission due to being over the max concurrency, we may need to 1935 * spin up a new thread. 1936 * 1937 * We take into account the maximum number of busy threads 1938 * that can affect may_start_constrained_thread as looking at the 1939 * actual number may_start_constrained_thread will see is racy. 1940 * 1941 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is 1942 * between NCPU (4) and NCPU - 2 (2) we need to redrive. 1943 */ 1944 uint32_t conc = wq_max_parallelism[_wq_bucket(qos)]; 1945 if (old_req_count <= conc && conc <= old_req_count + max_busycount) { 1946 start_timer = workq_schedule_delayed_thread_creation(wq, 0); 1947 } 1948 } 1949 if (__improbable(kdebug_enable)) { 1950 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq, 1951 old_thactive, qos, NULL, NULL); 1952 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq, 1953 old - 1, qos | (req_qos << 8), 1954 wq->wq_reqcount << 1 | start_timer); 1955 } 1956 break; 1957 1958 case SCHED_CALL_UNBLOCK: 1959 /* 1960 * we cannot take the workqueue_lock here... 1961 * an UNBLOCK can occur from a timer event which 1962 * is run from an interrupt context... if the workqueue_lock 1963 * is already held by this processor, we'll deadlock... 1964 * the thread lock for the thread being UNBLOCKED 1965 * is also held 1966 */ 1967 old_thactive = _wq_thactive_inc(wq, qos); 1968 if (__improbable(kdebug_enable)) { 1969 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq, 1970 old_thactive, qos, NULL, NULL); 1971 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive); 1972 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq, 1973 old + 1, qos | (req_qos << 8), 1974 wq->wq_threads_scheduled); 1975 } 1976 break; 1977 } 1978 } 1979 1980 #pragma mark workq lifecycle 1981 1982 void 1983 workq_reference(struct workqueue *wq) 1984 { 1985 os_ref_retain(&wq->wq_refcnt); 1986 } 1987 1988 static void 1989 workq_deallocate_queue_invoke(mpsc_queue_chain_t e, 1990 __assert_only mpsc_daemon_queue_t dq) 1991 { 1992 struct workqueue *wq; 1993 struct turnstile *ts; 1994 1995 wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link); 1996 assert(dq == &workq_deallocate_queue); 1997 1998 turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS); 1999 assert(ts); 2000 turnstile_cleanup(); 2001 turnstile_deallocate(ts); 2002 2003 lck_ticket_destroy(&wq->wq_lock, &workq_lck_grp); 2004 zfree(workq_zone_workqueue, wq); 2005 } 2006 2007 static void 2008 workq_deallocate(struct workqueue *wq) 2009 { 2010 if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) { 2011 workq_deallocate_queue_invoke(&wq->wq_destroy_link, 2012 &workq_deallocate_queue); 2013 } 2014 } 2015 2016 void 2017 workq_deallocate_safe(struct workqueue *wq) 2018 { 2019 if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) { 2020 mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link, 2021 MPSC_QUEUE_DISABLE_PREEMPTION); 2022 } 2023 } 2024 2025 /** 2026 * Setup per-process state for the workqueue. 2027 */ 2028 int 2029 workq_open(struct proc *p, __unused struct workq_open_args *uap, 2030 __unused int32_t *retval) 2031 { 2032 struct workqueue *wq; 2033 int error = 0; 2034 2035 if ((p->p_lflag & P_LREGISTER) == 0) { 2036 return EINVAL; 2037 } 2038 2039 if (wq_init_constrained_limit) { 2040 uint32_t limit, num_cpus = ml_wait_max_cpus(); 2041 2042 /* 2043 * set up the limit for the constrained pool 2044 * this is a virtual pool in that we don't 2045 * maintain it on a separate idle and run list 2046 */ 2047 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR; 2048 2049 if (limit > wq_max_constrained_threads) { 2050 wq_max_constrained_threads = limit; 2051 } 2052 2053 if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) { 2054 wq_max_threads = WQ_THACTIVE_BUCKET_HALF; 2055 } 2056 if (wq_max_threads > CONFIG_THREAD_MAX - 20) { 2057 wq_max_threads = CONFIG_THREAD_MAX - 20; 2058 } 2059 2060 wq_death_max_load = (uint16_t)fls(num_cpus) + 1; 2061 2062 for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) { 2063 wq_max_parallelism[_wq_bucket(qos)] = 2064 qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL); 2065 } 2066 2067 wq_max_cooperative_threads = num_cpus; 2068 2069 wq_init_constrained_limit = 0; 2070 } 2071 2072 if (proc_get_wqptr(p) == NULL) { 2073 if (proc_init_wqptr_or_wait(p) == FALSE) { 2074 assert(proc_get_wqptr(p) != NULL); 2075 goto out; 2076 } 2077 2078 wq = zalloc_flags(workq_zone_workqueue, Z_WAITOK | Z_ZERO); 2079 2080 os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1); 2081 2082 // Start the event manager at the priority hinted at by the policy engine 2083 thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task()); 2084 pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0); 2085 wq->wq_event_manager_priority = (uint32_t)pp; 2086 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime; 2087 wq->wq_proc = p; 2088 turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(), 2089 TURNSTILE_WORKQS); 2090 2091 TAILQ_INIT(&wq->wq_thrunlist); 2092 TAILQ_INIT(&wq->wq_thnewlist); 2093 TAILQ_INIT(&wq->wq_thidlelist); 2094 priority_queue_init(&wq->wq_overcommit_queue); 2095 priority_queue_init(&wq->wq_constrained_queue); 2096 priority_queue_init(&wq->wq_special_queue); 2097 for (int bucket = 0; bucket < WORKQ_NUM_QOS_BUCKETS; bucket++) { 2098 STAILQ_INIT(&wq->wq_cooperative_queue[bucket]); 2099 } 2100 2101 /* We are only using the delayed thread call for the constrained pool 2102 * which can't have work at >= UI QoS and so we can be fine with a 2103 * UI QoS thread call. 2104 */ 2105 wq->wq_delayed_call = thread_call_allocate_with_qos( 2106 workq_add_new_threads_call, p, THREAD_QOS_USER_INTERACTIVE, 2107 THREAD_CALL_OPTIONS_ONCE); 2108 wq->wq_immediate_call = thread_call_allocate_with_options( 2109 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL, 2110 THREAD_CALL_OPTIONS_ONCE); 2111 wq->wq_death_call = thread_call_allocate_with_options( 2112 workq_kill_old_threads_call, wq, 2113 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE); 2114 2115 lck_ticket_init(&wq->wq_lock, &workq_lck_grp); 2116 2117 WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq, 2118 VM_KERNEL_ADDRHIDE(wq), 0, 0); 2119 proc_set_wqptr(p, wq); 2120 } 2121 out: 2122 2123 return error; 2124 } 2125 2126 /* 2127 * Routine: workq_mark_exiting 2128 * 2129 * Function: Mark the work queue such that new threads will not be added to the 2130 * work queue after we return. 2131 * 2132 * Conditions: Called against the current process. 2133 */ 2134 void 2135 workq_mark_exiting(struct proc *p) 2136 { 2137 struct workqueue *wq = proc_get_wqptr(p); 2138 uint32_t wq_flags; 2139 workq_threadreq_t mgr_req; 2140 2141 if (!wq) { 2142 return; 2143 } 2144 2145 WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0); 2146 2147 workq_lock_spin(wq); 2148 2149 wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed); 2150 if (__improbable(wq_flags & WQ_EXITING)) { 2151 panic("workq_mark_exiting called twice"); 2152 } 2153 2154 /* 2155 * Opportunistically try to cancel thread calls that are likely in flight. 2156 * workq_exit() will do the proper cleanup. 2157 */ 2158 if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) { 2159 thread_call_cancel(wq->wq_immediate_call); 2160 } 2161 if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) { 2162 thread_call_cancel(wq->wq_delayed_call); 2163 } 2164 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) { 2165 thread_call_cancel(wq->wq_death_call); 2166 } 2167 2168 mgr_req = wq->wq_event_manager_threadreq; 2169 wq->wq_event_manager_threadreq = NULL; 2170 wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */ 2171 wq->wq_creator = NULL; 2172 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 2173 2174 workq_unlock(wq); 2175 2176 if (mgr_req) { 2177 kqueue_threadreq_cancel(p, mgr_req); 2178 } 2179 /* 2180 * No one touches the priority queues once WQ_EXITING is set. 2181 * It is hence safe to do the tear down without holding any lock. 2182 */ 2183 priority_queue_destroy(&wq->wq_overcommit_queue, 2184 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 2185 workq_threadreq_destroy(p, e); 2186 }); 2187 priority_queue_destroy(&wq->wq_constrained_queue, 2188 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 2189 workq_threadreq_destroy(p, e); 2190 }); 2191 priority_queue_destroy(&wq->wq_special_queue, 2192 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 2193 workq_threadreq_destroy(p, e); 2194 }); 2195 2196 WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0); 2197 } 2198 2199 /* 2200 * Routine: workq_exit 2201 * 2202 * Function: clean up the work queue structure(s) now that there are no threads 2203 * left running inside the work queue (except possibly current_thread). 2204 * 2205 * Conditions: Called by the last thread in the process. 2206 * Called against current process. 2207 */ 2208 void 2209 workq_exit(struct proc *p) 2210 { 2211 struct workqueue *wq; 2212 struct uthread *uth, *tmp; 2213 2214 wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed); 2215 if (wq != NULL) { 2216 thread_t th = current_thread(); 2217 2218 WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0); 2219 2220 if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) { 2221 /* 2222 * <rdar://problem/40111515> Make sure we will no longer call the 2223 * sched call, if we ever block this thread, which the cancel_wait 2224 * below can do. 2225 */ 2226 thread_sched_call(th, NULL); 2227 } 2228 2229 /* 2230 * Thread calls are always scheduled by the proc itself or under the 2231 * workqueue spinlock if WQ_EXITING is not yet set. 2232 * 2233 * Either way, when this runs, the proc has no threads left beside 2234 * the one running this very code, so we know no thread call can be 2235 * dispatched anymore. 2236 */ 2237 thread_call_cancel_wait(wq->wq_delayed_call); 2238 thread_call_cancel_wait(wq->wq_immediate_call); 2239 thread_call_cancel_wait(wq->wq_death_call); 2240 thread_call_free(wq->wq_delayed_call); 2241 thread_call_free(wq->wq_immediate_call); 2242 thread_call_free(wq->wq_death_call); 2243 2244 /* 2245 * Clean up workqueue data structures for threads that exited and 2246 * didn't get a chance to clean up after themselves. 2247 * 2248 * idle/new threads should have been interrupted and died on their own 2249 */ 2250 TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) { 2251 thread_t mth = get_machthread(uth); 2252 thread_sched_call(mth, NULL); 2253 thread_deallocate(mth); 2254 } 2255 assert(TAILQ_EMPTY(&wq->wq_thnewlist)); 2256 assert(TAILQ_EMPTY(&wq->wq_thidlelist)); 2257 2258 WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq, 2259 VM_KERNEL_ADDRHIDE(wq), 0, 0); 2260 2261 workq_deallocate(wq); 2262 2263 WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0); 2264 } 2265 } 2266 2267 2268 #pragma mark bsd thread control 2269 2270 bool 2271 bsdthread_part_of_cooperative_workqueue(struct uthread *uth) 2272 { 2273 return (workq_thread_is_cooperative(uth) || workq_thread_is_nonovercommit(uth)) && 2274 (uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER); 2275 } 2276 2277 static bool 2278 _pthread_priority_to_policy(pthread_priority_t priority, 2279 thread_qos_policy_data_t *data) 2280 { 2281 data->qos_tier = _pthread_priority_thread_qos(priority); 2282 data->tier_importance = _pthread_priority_relpri(priority); 2283 if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 || 2284 data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) { 2285 return false; 2286 } 2287 return true; 2288 } 2289 2290 static int 2291 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority, 2292 mach_port_name_t voucher, enum workq_set_self_flags flags) 2293 { 2294 struct uthread *uth = get_bsdthread_info(th); 2295 struct workqueue *wq = proc_get_wqptr(p); 2296 2297 kern_return_t kr; 2298 int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0; 2299 bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE); 2300 2301 assert(th == current_thread()); 2302 if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) { 2303 if (!is_wq_thread) { 2304 unbind_rv = EINVAL; 2305 goto qos; 2306 } 2307 2308 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 2309 unbind_rv = EINVAL; 2310 goto qos; 2311 } 2312 2313 workq_threadreq_t kqr = uth->uu_kqr_bound; 2314 if (kqr == NULL) { 2315 unbind_rv = EALREADY; 2316 goto qos; 2317 } 2318 2319 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 2320 unbind_rv = EINVAL; 2321 goto qos; 2322 } 2323 2324 kqueue_threadreq_unbind(p, kqr); 2325 } 2326 2327 qos: 2328 if (flags & (WORKQ_SET_SELF_QOS_FLAG | WORKQ_SET_SELF_QOS_OVERRIDE_FLAG)) { 2329 assert(flags & WORKQ_SET_SELF_QOS_FLAG); 2330 2331 thread_qos_policy_data_t new_policy; 2332 thread_qos_t qos_override = THREAD_QOS_UNSPECIFIED; 2333 2334 if (!_pthread_priority_to_policy(priority, &new_policy)) { 2335 qos_rv = EINVAL; 2336 goto voucher; 2337 } 2338 2339 if (flags & WORKQ_SET_SELF_QOS_OVERRIDE_FLAG) { 2340 /* 2341 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is set, we definitely 2342 * should have an override QoS in the pthread_priority_t and we should 2343 * only come into this path for cooperative thread requests 2344 */ 2345 if (!_pthread_priority_has_override_qos(priority) || 2346 !_pthread_priority_is_cooperative(priority)) { 2347 qos_rv = EINVAL; 2348 goto voucher; 2349 } 2350 qos_override = _pthread_priority_thread_override_qos(priority); 2351 } else { 2352 /* 2353 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is not set, we definitely 2354 * should not have an override QoS in the pthread_priority_t 2355 */ 2356 if (_pthread_priority_has_override_qos(priority)) { 2357 qos_rv = EINVAL; 2358 goto voucher; 2359 } 2360 } 2361 2362 if (!is_wq_thread) { 2363 /* 2364 * Threads opted out of QoS can't change QoS 2365 */ 2366 if (!thread_has_qos_policy(th)) { 2367 qos_rv = EPERM; 2368 goto voucher; 2369 } 2370 } else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER || 2371 uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) { 2372 /* 2373 * Workqueue manager threads or threads above UI can't change QoS 2374 */ 2375 qos_rv = EINVAL; 2376 goto voucher; 2377 } else { 2378 /* 2379 * For workqueue threads, possibly adjust buckets and redrive thread 2380 * requests. 2381 * 2382 * Transitions allowed: 2383 * 2384 * overcommit --> non-overcommit 2385 * overcommit --> overcommit 2386 * non-overcommit --> non-overcommit 2387 * non-overcommit --> overcommit (to be deprecated later) 2388 * cooperative --> cooperative 2389 * 2390 * All other transitions aren't allowed so reject them. 2391 */ 2392 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_cooperative(priority)) { 2393 qos_rv = EINVAL; 2394 goto voucher; 2395 } else if (workq_thread_is_cooperative(uth) && !_pthread_priority_is_cooperative(priority)) { 2396 qos_rv = EINVAL; 2397 goto voucher; 2398 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_cooperative(priority)) { 2399 qos_rv = EINVAL; 2400 goto voucher; 2401 } 2402 2403 struct uu_workq_policy old_pri, new_pri; 2404 bool force_run = false; 2405 2406 if (qos_override) { 2407 /* 2408 * We're in the case of a thread clarifying that it is for eg. not IN 2409 * req QoS but rather, UT req QoS with IN override. However, this can 2410 * race with a concurrent override happening to the thread via 2411 * workq_thread_add_dispatch_override so this needs to be 2412 * synchronized with the thread mutex. 2413 */ 2414 thread_mtx_lock(th); 2415 } 2416 2417 workq_lock_spin(wq); 2418 2419 old_pri = new_pri = uth->uu_workq_pri; 2420 new_pri.qos_req = (thread_qos_t)new_policy.qos_tier; 2421 2422 if (old_pri.qos_override < qos_override) { 2423 /* 2424 * Since this can race with a concurrent override via 2425 * workq_thread_add_dispatch_override, only adjust override value if we 2426 * are higher - this is a saturating function. 2427 * 2428 * We should not be changing the final override values, we should simply 2429 * be redistributing the current value with a different breakdown of req 2430 * vs override QoS - assert to that effect. Therefore, buckets should 2431 * not change. 2432 */ 2433 new_pri.qos_override = qos_override; 2434 assert(workq_pri_override(new_pri) == workq_pri_override(old_pri)); 2435 assert(workq_pri_bucket(new_pri) == workq_pri_bucket(old_pri)); 2436 } 2437 2438 /* Adjust schedule counts for various types of transitions */ 2439 2440 /* overcommit -> non-overcommit */ 2441 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_nonovercommit(priority)) { 2442 workq_thread_set_type(uth, 0); 2443 wq->wq_constrained_threads_scheduled++; 2444 2445 /* non-overcommit -> overcommit */ 2446 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_overcommit(priority)) { 2447 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT); 2448 force_run = (wq->wq_constrained_threads_scheduled-- == wq_max_constrained_threads); 2449 2450 /* cooperative -> cooperative */ 2451 } else if (workq_thread_is_cooperative(uth)) { 2452 _wq_cooperative_queue_scheduled_count_dec(wq, old_pri.qos_req); 2453 _wq_cooperative_queue_scheduled_count_inc(wq, new_pri.qos_req); 2454 2455 /* We're changing schedule counts within cooperative pool, we 2456 * need to refresh best cooperative QoS logic again */ 2457 force_run = _wq_cooperative_queue_refresh_best_req_qos(wq); 2458 } 2459 2460 /* 2461 * This will set up an override on the thread if any and will also call 2462 * schedule_creator if needed 2463 */ 2464 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run); 2465 workq_unlock(wq); 2466 2467 if (qos_override) { 2468 thread_mtx_unlock(th); 2469 } 2470 2471 if (workq_thread_is_overcommit(uth)) { 2472 thread_disarm_workqueue_quantum(th); 2473 } else { 2474 /* If the thread changed QoS buckets, the quantum duration 2475 * may have changed too */ 2476 thread_arm_workqueue_quantum(th); 2477 } 2478 } 2479 2480 kr = thread_policy_set_internal(th, THREAD_QOS_POLICY, 2481 (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT); 2482 if (kr != KERN_SUCCESS) { 2483 qos_rv = EINVAL; 2484 } 2485 } 2486 2487 voucher: 2488 if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) { 2489 kr = thread_set_voucher_name(voucher); 2490 if (kr != KERN_SUCCESS) { 2491 voucher_rv = ENOENT; 2492 goto fixedpri; 2493 } 2494 } 2495 2496 fixedpri: 2497 if (qos_rv) { 2498 goto done; 2499 } 2500 if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) { 2501 thread_extended_policy_data_t extpol = {.timeshare = 0}; 2502 2503 if (is_wq_thread) { 2504 /* Not allowed on workqueue threads */ 2505 fixedpri_rv = ENOTSUP; 2506 goto done; 2507 } 2508 2509 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY, 2510 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT); 2511 if (kr != KERN_SUCCESS) { 2512 fixedpri_rv = EINVAL; 2513 goto done; 2514 } 2515 } else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) { 2516 thread_extended_policy_data_t extpol = {.timeshare = 1}; 2517 2518 if (is_wq_thread) { 2519 /* Not allowed on workqueue threads */ 2520 fixedpri_rv = ENOTSUP; 2521 goto done; 2522 } 2523 2524 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY, 2525 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT); 2526 if (kr != KERN_SUCCESS) { 2527 fixedpri_rv = EINVAL; 2528 goto done; 2529 } 2530 } 2531 2532 done: 2533 if (qos_rv && voucher_rv) { 2534 /* Both failed, give that a unique error. */ 2535 return EBADMSG; 2536 } 2537 2538 if (unbind_rv) { 2539 return unbind_rv; 2540 } 2541 2542 if (qos_rv) { 2543 return qos_rv; 2544 } 2545 2546 if (voucher_rv) { 2547 return voucher_rv; 2548 } 2549 2550 if (fixedpri_rv) { 2551 return fixedpri_rv; 2552 } 2553 2554 2555 return 0; 2556 } 2557 2558 static int 2559 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport, 2560 pthread_priority_t pp, user_addr_t resource) 2561 { 2562 thread_qos_t qos = _pthread_priority_thread_qos(pp); 2563 if (qos == THREAD_QOS_UNSPECIFIED) { 2564 return EINVAL; 2565 } 2566 2567 thread_t th = port_name_to_thread(kport, 2568 PORT_INTRANS_THREAD_IN_CURRENT_TASK); 2569 if (th == THREAD_NULL) { 2570 return ESRCH; 2571 } 2572 2573 int rv = proc_thread_qos_add_override(proc_task(p), th, 0, qos, TRUE, 2574 resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE); 2575 2576 thread_deallocate(th); 2577 return rv; 2578 } 2579 2580 static int 2581 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport, 2582 user_addr_t resource) 2583 { 2584 thread_t th = port_name_to_thread(kport, 2585 PORT_INTRANS_THREAD_IN_CURRENT_TASK); 2586 if (th == THREAD_NULL) { 2587 return ESRCH; 2588 } 2589 2590 int rv = proc_thread_qos_remove_override(proc_task(p), th, 0, resource, 2591 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE); 2592 2593 thread_deallocate(th); 2594 return rv; 2595 } 2596 2597 static int 2598 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport, 2599 pthread_priority_t pp, user_addr_t ulock_addr) 2600 { 2601 struct uu_workq_policy old_pri, new_pri; 2602 struct workqueue *wq = proc_get_wqptr(p); 2603 2604 thread_qos_t qos_override = _pthread_priority_thread_qos(pp); 2605 if (qos_override == THREAD_QOS_UNSPECIFIED) { 2606 return EINVAL; 2607 } 2608 2609 thread_t thread = port_name_to_thread(kport, 2610 PORT_INTRANS_THREAD_IN_CURRENT_TASK); 2611 if (thread == THREAD_NULL) { 2612 return ESRCH; 2613 } 2614 2615 struct uthread *uth = get_bsdthread_info(thread); 2616 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) { 2617 thread_deallocate(thread); 2618 return EPERM; 2619 } 2620 2621 WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE, 2622 wq, thread_tid(thread), 1, pp); 2623 2624 thread_mtx_lock(thread); 2625 2626 if (ulock_addr) { 2627 uint32_t val; 2628 int rc; 2629 /* 2630 * Workaround lack of explicit support for 'no-fault copyin' 2631 * <rdar://problem/24999882>, as disabling preemption prevents paging in 2632 */ 2633 disable_preemption(); 2634 rc = copyin_atomic32(ulock_addr, &val); 2635 enable_preemption(); 2636 if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) { 2637 goto out; 2638 } 2639 } 2640 2641 workq_lock_spin(wq); 2642 2643 old_pri = uth->uu_workq_pri; 2644 if (old_pri.qos_override >= qos_override) { 2645 /* Nothing to do */ 2646 } else if (thread == current_thread()) { 2647 new_pri = old_pri; 2648 new_pri.qos_override = qos_override; 2649 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 2650 } else { 2651 uth->uu_workq_pri.qos_override = qos_override; 2652 if (qos_override > workq_pri_override(old_pri)) { 2653 thread_set_workq_override(thread, qos_override); 2654 } 2655 } 2656 2657 workq_unlock(wq); 2658 2659 out: 2660 thread_mtx_unlock(thread); 2661 thread_deallocate(thread); 2662 return 0; 2663 } 2664 2665 static int 2666 workq_thread_reset_dispatch_override(proc_t p, thread_t thread) 2667 { 2668 struct uu_workq_policy old_pri, new_pri; 2669 struct workqueue *wq = proc_get_wqptr(p); 2670 struct uthread *uth = get_bsdthread_info(thread); 2671 2672 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) { 2673 return EPERM; 2674 } 2675 2676 WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0); 2677 2678 /* 2679 * workq_thread_add_dispatch_override takes the thread mutex before doing the 2680 * copyin to validate the drainer and apply the override. We need to do the 2681 * same here. See rdar://84472518 2682 */ 2683 thread_mtx_lock(thread); 2684 2685 workq_lock_spin(wq); 2686 old_pri = new_pri = uth->uu_workq_pri; 2687 new_pri.qos_override = THREAD_QOS_UNSPECIFIED; 2688 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 2689 workq_unlock(wq); 2690 2691 thread_mtx_unlock(thread); 2692 return 0; 2693 } 2694 2695 static int 2696 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable) 2697 { 2698 if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) { 2699 // If the thread isn't a workqueue thread, don't set the 2700 // kill_allowed bit; however, we still need to return 0 2701 // instead of an error code since this code is executed 2702 // on the abort path which needs to not depend on the 2703 // pthread_t (returning an error depends on pthread_t via 2704 // cerror_nocancel) 2705 return 0; 2706 } 2707 struct uthread *uth = get_bsdthread_info(thread); 2708 uth->uu_workq_pthread_kill_allowed = enable; 2709 return 0; 2710 } 2711 2712 static int 2713 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags, 2714 int *retval) 2715 { 2716 static_assert(QOS_PARALLELISM_COUNT_LOGICAL == 2717 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical"); 2718 static_assert(QOS_PARALLELISM_REALTIME == 2719 _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime"); 2720 static_assert(QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE == 2721 _PTHREAD_QOS_PARALLELISM_CLUSTER_SHARED_RSRC, "cluster shared resource"); 2722 2723 if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL | QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE)) { 2724 return EINVAL; 2725 } 2726 2727 /* No units are present */ 2728 if (flags & QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE) { 2729 return ENOTSUP; 2730 } 2731 2732 if (flags & QOS_PARALLELISM_REALTIME) { 2733 if (qos) { 2734 return EINVAL; 2735 } 2736 } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) { 2737 return EINVAL; 2738 } 2739 2740 *retval = qos_max_parallelism(qos, flags); 2741 return 0; 2742 } 2743 2744 static int 2745 bsdthread_dispatch_apply_attr(__unused struct proc *p, thread_t thread, 2746 unsigned long flags, uint64_t value1, __unused uint64_t value2) 2747 { 2748 uint32_t apply_worker_index; 2749 kern_return_t kr; 2750 2751 switch (flags) { 2752 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_SET: 2753 apply_worker_index = (uint32_t)value1; 2754 kr = thread_shared_rsrc_policy_set(thread, apply_worker_index, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH); 2755 /* 2756 * KERN_INVALID_POLICY indicates that the thread was trying to bind to a 2757 * cluster which it was not eligible to execute on. 2758 */ 2759 return (kr == KERN_SUCCESS) ? 0 : ((kr == KERN_INVALID_POLICY) ? ENOTSUP : EINVAL); 2760 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_CLEAR: 2761 kr = thread_shared_rsrc_policy_clear(thread, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH); 2762 return (kr == KERN_SUCCESS) ? 0 : EINVAL; 2763 default: 2764 return EINVAL; 2765 } 2766 } 2767 2768 #define ENSURE_UNUSED(arg) \ 2769 ({ if ((arg) != 0) { return EINVAL; } }) 2770 2771 int 2772 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval) 2773 { 2774 switch (uap->cmd) { 2775 case BSDTHREAD_CTL_QOS_OVERRIDE_START: 2776 return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1, 2777 (pthread_priority_t)uap->arg2, uap->arg3); 2778 case BSDTHREAD_CTL_QOS_OVERRIDE_END: 2779 ENSURE_UNUSED(uap->arg3); 2780 return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1, 2781 (user_addr_t)uap->arg2); 2782 2783 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH: 2784 return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1, 2785 (pthread_priority_t)uap->arg2, uap->arg3); 2786 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET: 2787 return workq_thread_reset_dispatch_override(p, current_thread()); 2788 2789 case BSDTHREAD_CTL_SET_SELF: 2790 return bsdthread_set_self(p, current_thread(), 2791 (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2, 2792 (enum workq_set_self_flags)uap->arg3); 2793 2794 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM: 2795 ENSURE_UNUSED(uap->arg3); 2796 return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1, 2797 (unsigned long)uap->arg2, retval); 2798 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL: 2799 ENSURE_UNUSED(uap->arg2); 2800 ENSURE_UNUSED(uap->arg3); 2801 return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1); 2802 case BSDTHREAD_CTL_DISPATCH_APPLY_ATTR: 2803 return bsdthread_dispatch_apply_attr(p, current_thread(), 2804 (unsigned long)uap->arg1, (uint64_t)uap->arg2, 2805 (uint64_t)uap->arg3); 2806 case BSDTHREAD_CTL_SET_QOS: 2807 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD: 2808 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET: 2809 /* no longer supported */ 2810 return ENOTSUP; 2811 2812 default: 2813 return EINVAL; 2814 } 2815 } 2816 2817 #pragma mark workqueue thread manipulation 2818 2819 static void __dead2 2820 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 2821 struct uthread *uth, uint32_t setup_flags); 2822 2823 static void __dead2 2824 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 2825 struct uthread *uth, uint32_t setup_flags); 2826 2827 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2; 2828 2829 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD 2830 static inline uint64_t 2831 workq_trace_req_id(workq_threadreq_t req) 2832 { 2833 struct kqworkloop *kqwl; 2834 if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 2835 kqwl = __container_of(req, struct kqworkloop, kqwl_request); 2836 return kqwl->kqwl_dynamicid; 2837 } 2838 2839 return VM_KERNEL_ADDRHIDE(req); 2840 } 2841 #endif 2842 2843 /** 2844 * Entry point for libdispatch to ask for threads 2845 */ 2846 static int 2847 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp, bool cooperative) 2848 { 2849 thread_qos_t qos = _pthread_priority_thread_qos(pp); 2850 struct workqueue *wq = proc_get_wqptr(p); 2851 uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI; 2852 int ret = 0; 2853 2854 if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX || 2855 qos == THREAD_QOS_UNSPECIFIED) { 2856 ret = EINVAL; 2857 goto exit; 2858 } 2859 2860 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE, 2861 wq, reqcount, pp, cooperative); 2862 2863 workq_threadreq_t req = zalloc(workq_zone_threadreq); 2864 priority_queue_entry_init(&req->tr_entry); 2865 req->tr_state = WORKQ_TR_STATE_NEW; 2866 req->tr_qos = qos; 2867 workq_tr_flags_t tr_flags = 0; 2868 2869 if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) { 2870 tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT; 2871 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 2872 } 2873 2874 if (cooperative) { 2875 tr_flags |= WORKQ_TR_FLAG_COOPERATIVE; 2876 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE; 2877 2878 if (reqcount > 1) { 2879 ret = ENOTSUP; 2880 goto free_and_exit; 2881 } 2882 } 2883 2884 /* A thread request cannot be both overcommit and cooperative */ 2885 if (workq_tr_is_cooperative(tr_flags) && 2886 workq_tr_is_overcommit(tr_flags)) { 2887 ret = EINVAL; 2888 goto free_and_exit; 2889 } 2890 req->tr_flags = tr_flags; 2891 2892 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, 2893 wq, workq_trace_req_id(req), req->tr_qos, reqcount); 2894 2895 workq_lock_spin(wq); 2896 do { 2897 if (_wq_exiting(wq)) { 2898 goto unlock_and_exit; 2899 } 2900 2901 /* 2902 * When userspace is asking for parallelism, wakeup up to (reqcount - 1) 2903 * threads without pacing, to inform the scheduler of that workload. 2904 * 2905 * The last requests, or the ones that failed the admission checks are 2906 * enqueued and go through the regular creator codepath. 2907 * 2908 * If there aren't enough threads, add one, but re-evaluate everything 2909 * as conditions may now have changed. 2910 */ 2911 unpaced = reqcount - 1; 2912 2913 if (reqcount > 1) { 2914 /* We don't handle asking for parallelism on the cooperative 2915 * workqueue just yet */ 2916 assert(!workq_threadreq_is_cooperative(req)); 2917 2918 if (workq_threadreq_is_nonovercommit(req)) { 2919 unpaced = workq_constrained_allowance(wq, qos, NULL, false); 2920 if (unpaced >= reqcount - 1) { 2921 unpaced = reqcount - 1; 2922 } 2923 } 2924 } 2925 2926 /* 2927 * This path does not currently handle custom workloop parameters 2928 * when creating threads for parallelism. 2929 */ 2930 assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)); 2931 2932 /* 2933 * This is a trimmed down version of workq_threadreq_bind_and_unlock() 2934 */ 2935 while (unpaced > 0 && wq->wq_thidlecount) { 2936 struct uthread *uth; 2937 bool needs_wakeup; 2938 uint8_t uu_flags = UT_WORKQ_EARLY_BOUND; 2939 2940 if (workq_tr_is_overcommit(req->tr_flags)) { 2941 uu_flags |= UT_WORKQ_OVERCOMMIT; 2942 } 2943 2944 uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup); 2945 2946 _wq_thactive_inc(wq, qos); 2947 wq->wq_thscheduled_count[_wq_bucket(qos)]++; 2948 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 2949 wq->wq_fulfilled++; 2950 2951 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags; 2952 uth->uu_save.uus_workq_park_data.thread_request = req; 2953 if (needs_wakeup) { 2954 workq_thread_wakeup(uth); 2955 } 2956 unpaced--; 2957 reqcount--; 2958 } 2959 } while (unpaced && wq->wq_nthreads < wq_max_threads && 2960 workq_add_new_idle_thread(p, wq)); 2961 2962 if (_wq_exiting(wq)) { 2963 goto unlock_and_exit; 2964 } 2965 2966 req->tr_count = (uint16_t)reqcount; 2967 if (workq_threadreq_enqueue(wq, req)) { 2968 /* This can drop the workqueue lock, and take it again */ 2969 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 2970 } 2971 workq_unlock(wq); 2972 return 0; 2973 2974 unlock_and_exit: 2975 workq_unlock(wq); 2976 free_and_exit: 2977 zfree(workq_zone_threadreq, req); 2978 exit: 2979 return ret; 2980 } 2981 2982 bool 2983 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req, 2984 struct turnstile *workloop_ts, thread_qos_t qos, 2985 workq_kern_threadreq_flags_t flags) 2986 { 2987 struct workqueue *wq = proc_get_wqptr_fast(p); 2988 struct uthread *uth = NULL; 2989 2990 assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)); 2991 2992 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 2993 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req); 2994 qos = thread_workq_qos_for_pri(trp.trp_pri); 2995 if (qos == THREAD_QOS_UNSPECIFIED) { 2996 qos = WORKQ_THREAD_QOS_ABOVEUI; 2997 } 2998 } 2999 3000 assert(req->tr_state == WORKQ_TR_STATE_IDLE); 3001 priority_queue_entry_init(&req->tr_entry); 3002 req->tr_count = 1; 3003 req->tr_state = WORKQ_TR_STATE_NEW; 3004 req->tr_qos = qos; 3005 3006 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq, 3007 workq_trace_req_id(req), qos, 1); 3008 3009 if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) { 3010 /* 3011 * we're called back synchronously from the context of 3012 * kqueue_threadreq_unbind from within workq_thread_return() 3013 * we can try to match up this thread with this request ! 3014 */ 3015 uth = current_uthread(); 3016 assert(uth->uu_kqr_bound == NULL); 3017 } 3018 3019 workq_lock_spin(wq); 3020 if (_wq_exiting(wq)) { 3021 req->tr_state = WORKQ_TR_STATE_IDLE; 3022 workq_unlock(wq); 3023 return false; 3024 } 3025 3026 if (uth && workq_threadreq_admissible(wq, uth, req)) { 3027 /* This is the case of the rebind - we were about to park and unbind 3028 * when more events came so keep the binding. 3029 */ 3030 assert(uth != wq->wq_creator); 3031 3032 if (uth->uu_workq_pri.qos_bucket != req->tr_qos) { 3033 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos); 3034 workq_thread_reset_pri(wq, uth, req, /*unpark*/ false); 3035 } 3036 /* 3037 * We're called from workq_kern_threadreq_initiate() 3038 * due to an unbind, with the kq req held. 3039 */ 3040 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 3041 workq_trace_req_id(req), req->tr_flags, 0); 3042 wq->wq_fulfilled++; 3043 3044 kqueue_threadreq_bind(p, req, get_machthread(uth), 0); 3045 } else { 3046 if (workloop_ts) { 3047 workq_perform_turnstile_operation_locked(wq, ^{ 3048 turnstile_update_inheritor(workloop_ts, wq->wq_turnstile, 3049 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE); 3050 turnstile_update_inheritor_complete(workloop_ts, 3051 TURNSTILE_INTERLOCK_HELD); 3052 }); 3053 } 3054 3055 bool reevaluate_creator_thread_group = false; 3056 #if CONFIG_PREADOPT_TG 3057 reevaluate_creator_thread_group = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG); 3058 #endif 3059 /* We enqueued the highest priority item or we may need to reevaluate if 3060 * the creator needs a thread group pre-adoption */ 3061 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_thread_group) { 3062 workq_schedule_creator(p, wq, flags); 3063 } 3064 } 3065 3066 workq_unlock(wq); 3067 3068 return true; 3069 } 3070 3071 void 3072 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req, 3073 thread_qos_t qos, workq_kern_threadreq_flags_t flags) 3074 { 3075 struct workqueue *wq = proc_get_wqptr_fast(p); 3076 bool make_overcommit = false; 3077 3078 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 3079 /* Requests outside-of-QoS shouldn't accept modify operations */ 3080 return; 3081 } 3082 3083 workq_lock_spin(wq); 3084 3085 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 3086 assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)); 3087 3088 if (req->tr_state == WORKQ_TR_STATE_BINDING) { 3089 kqueue_threadreq_bind(p, req, req->tr_thread, 0); 3090 workq_unlock(wq); 3091 return; 3092 } 3093 3094 if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) { 3095 /* TODO (rokhinip): We come into this code path for kqwl thread 3096 * requests. kqwl requests cannot be cooperative. 3097 */ 3098 assert(!workq_threadreq_is_cooperative(req)); 3099 3100 make_overcommit = workq_threadreq_is_nonovercommit(req); 3101 } 3102 3103 if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) { 3104 workq_unlock(wq); 3105 return; 3106 } 3107 3108 assert(req->tr_count == 1); 3109 if (req->tr_state != WORKQ_TR_STATE_QUEUED) { 3110 panic("Invalid thread request (%p) state %d", req, req->tr_state); 3111 } 3112 3113 WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq, 3114 workq_trace_req_id(req), qos, 0); 3115 3116 struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req); 3117 workq_threadreq_t req_max; 3118 3119 /* 3120 * Stage 1: Dequeue the request from its priority queue. 3121 * 3122 * If we dequeue the root item of the constrained priority queue, 3123 * maintain the best constrained request qos invariant. 3124 */ 3125 if (priority_queue_remove(pq, &req->tr_entry)) { 3126 if (workq_threadreq_is_nonovercommit(req)) { 3127 _wq_thactive_refresh_best_constrained_req_qos(wq); 3128 } 3129 } 3130 3131 /* 3132 * Stage 2: Apply changes to the thread request 3133 * 3134 * If the item will not become the root of the priority queue it belongs to, 3135 * then we need to wait in line, just enqueue and return quickly. 3136 */ 3137 if (__improbable(make_overcommit)) { 3138 req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT; 3139 pq = workq_priority_queue_for_req(wq, req); 3140 } 3141 req->tr_qos = qos; 3142 3143 req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry); 3144 if (req_max && req_max->tr_qos >= qos) { 3145 priority_queue_entry_set_sched_pri(pq, &req->tr_entry, 3146 workq_priority_for_req(req), false); 3147 priority_queue_insert(pq, &req->tr_entry); 3148 workq_unlock(wq); 3149 return; 3150 } 3151 3152 /* 3153 * Stage 3: Reevaluate whether we should run the thread request. 3154 * 3155 * Pretend the thread request is new again: 3156 * - adjust wq_reqcount to not count it anymore. 3157 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock 3158 * properly attempts a synchronous bind) 3159 */ 3160 wq->wq_reqcount--; 3161 req->tr_state = WORKQ_TR_STATE_NEW; 3162 3163 /* We enqueued the highest priority item or we may need to reevaluate if 3164 * the creator needs a thread group pre-adoption if the request got a new TG */ 3165 bool reevaluate_creator_tg = false; 3166 3167 #if CONFIG_PREADOPT_TG 3168 reevaluate_creator_tg = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG); 3169 #endif 3170 3171 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_tg) { 3172 workq_schedule_creator(p, wq, flags); 3173 } 3174 workq_unlock(wq); 3175 } 3176 3177 void 3178 workq_kern_threadreq_lock(struct proc *p) 3179 { 3180 workq_lock_spin(proc_get_wqptr_fast(p)); 3181 } 3182 3183 void 3184 workq_kern_threadreq_unlock(struct proc *p) 3185 { 3186 workq_unlock(proc_get_wqptr_fast(p)); 3187 } 3188 3189 void 3190 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req, 3191 thread_t owner, struct turnstile *wl_ts, 3192 turnstile_update_flags_t flags) 3193 { 3194 struct workqueue *wq = proc_get_wqptr_fast(p); 3195 turnstile_inheritor_t inheritor; 3196 3197 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 3198 assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP); 3199 workq_lock_held(wq); 3200 3201 if (req->tr_state == WORKQ_TR_STATE_BINDING) { 3202 kqueue_threadreq_bind(p, req, req->tr_thread, 3203 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE); 3204 return; 3205 } 3206 3207 if (_wq_exiting(wq)) { 3208 inheritor = TURNSTILE_INHERITOR_NULL; 3209 } else { 3210 if (req->tr_state != WORKQ_TR_STATE_QUEUED) { 3211 panic("Invalid thread request (%p) state %d", req, req->tr_state); 3212 } 3213 3214 if (owner) { 3215 inheritor = owner; 3216 flags |= TURNSTILE_INHERITOR_THREAD; 3217 } else { 3218 inheritor = wq->wq_turnstile; 3219 flags |= TURNSTILE_INHERITOR_TURNSTILE; 3220 } 3221 } 3222 3223 workq_perform_turnstile_operation_locked(wq, ^{ 3224 turnstile_update_inheritor(wl_ts, inheritor, flags); 3225 }); 3226 } 3227 3228 void 3229 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags) 3230 { 3231 struct workqueue *wq = proc_get_wqptr_fast(p); 3232 3233 workq_lock_spin(wq); 3234 workq_schedule_creator(p, wq, flags); 3235 workq_unlock(wq); 3236 } 3237 3238 /* 3239 * Always called at AST by the thread on itself 3240 * 3241 * Upon quantum expiry, the workqueue subsystem evaluates its state and decides 3242 * on what the thread should do next. The TSD value is always set by the thread 3243 * on itself in the kernel and cleared either by userspace when it acks the TSD 3244 * value and takes action, or by the thread in the kernel when the quantum 3245 * expires again. 3246 */ 3247 void 3248 workq_kern_quantum_expiry_reevaluate(proc_t proc, thread_t thread) 3249 { 3250 struct uthread *uth = get_bsdthread_info(thread); 3251 3252 if (uth->uu_workq_flags & UT_WORKQ_DYING) { 3253 return; 3254 } 3255 3256 if (!thread_supports_cooperative_workqueue(thread)) { 3257 panic("Quantum expired for thread that doesn't support cooperative workqueue"); 3258 } 3259 3260 thread_qos_t qos = uth->uu_workq_pri.qos_bucket; 3261 if (qos == THREAD_QOS_UNSPECIFIED) { 3262 panic("Thread should not have workq bucket of QoS UN"); 3263 } 3264 3265 assert(thread_has_expired_workqueue_quantum(thread, false)); 3266 3267 struct workqueue *wq = proc_get_wqptr(proc); 3268 assert(wq != NULL); 3269 3270 /* 3271 * For starters, we're just going to evaluate and see if we need to narrow 3272 * the pool and tell this thread to park if needed. In the future, we'll 3273 * evaluate and convey other workqueue state information like needing to 3274 * pump kevents, etc. 3275 */ 3276 uint64_t flags = 0; 3277 3278 workq_lock_spin(wq); 3279 3280 if (workq_thread_is_cooperative(uth)) { 3281 if (!workq_cooperative_allowance(wq, qos, uth, false)) { 3282 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW; 3283 } else { 3284 /* In the future, when we have kevent hookups for the cooperative 3285 * pool, we need fancier logic for what userspace should do. But 3286 * right now, only userspace thread requests exist - so we'll just 3287 * tell userspace to shuffle work items */ 3288 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_SHUFFLE; 3289 } 3290 } else if (workq_thread_is_nonovercommit(uth)) { 3291 if (!workq_constrained_allowance(wq, qos, uth, false)) { 3292 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW; 3293 } 3294 } 3295 workq_unlock(wq); 3296 3297 WQ_TRACE(TRACE_wq_quantum_expiry_reevaluate, flags, 0, 0, 0); 3298 3299 kevent_set_workq_quantum_expiry_user_tsd(proc, thread, flags); 3300 3301 /* We have conveyed to userspace about what it needs to do upon quantum 3302 * expiry, now rearm the workqueue quantum again */ 3303 thread_arm_workqueue_quantum(get_machthread(uth)); 3304 } 3305 3306 void 3307 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked) 3308 { 3309 if (locked) { 3310 workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE); 3311 } else { 3312 workq_schedule_immediate_thread_creation(wq); 3313 } 3314 } 3315 3316 static int 3317 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap, 3318 struct workqueue *wq) 3319 { 3320 thread_t th = current_thread(); 3321 struct uthread *uth = get_bsdthread_info(th); 3322 workq_threadreq_t kqr = uth->uu_kqr_bound; 3323 workq_threadreq_param_t trp = { }; 3324 int nevents = uap->affinity, error; 3325 user_addr_t eventlist = uap->item; 3326 3327 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) || 3328 (uth->uu_workq_flags & UT_WORKQ_DYING)) { 3329 return EINVAL; 3330 } 3331 3332 if (eventlist && nevents && kqr == NULL) { 3333 return EINVAL; 3334 } 3335 3336 /* reset signal mask on the workqueue thread to default state */ 3337 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) { 3338 proc_lock(p); 3339 uth->uu_sigmask = ~workq_threadmask; 3340 proc_unlock(p); 3341 } 3342 3343 if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) { 3344 /* 3345 * Ensure we store the threadreq param before unbinding 3346 * the kqr from this thread. 3347 */ 3348 trp = kqueue_threadreq_workloop_param(kqr); 3349 } 3350 3351 /* 3352 * Freeze the base pri while we decide the fate of this thread. 3353 * 3354 * Either: 3355 * - we return to user and kevent_cleanup will have unfrozen the base pri, 3356 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will. 3357 */ 3358 thread_freeze_base_pri(th); 3359 3360 if (kqr) { 3361 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE; 3362 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 3363 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT; 3364 } else { 3365 upcall_flags |= WQ_FLAG_THREAD_KEVENT; 3366 } 3367 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 3368 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER; 3369 } else { 3370 if (workq_thread_is_overcommit(uth)) { 3371 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 3372 } 3373 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) { 3374 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS; 3375 } else { 3376 upcall_flags |= uth->uu_workq_pri.qos_req | 3377 WQ_FLAG_THREAD_PRIO_QOS; 3378 } 3379 } 3380 error = pthread_functions->workq_handle_stack_events(p, th, 3381 get_task_map(proc_task(p)), uth->uu_workq_stackaddr, 3382 uth->uu_workq_thport, eventlist, nevents, upcall_flags); 3383 if (error) { 3384 assert(uth->uu_kqr_bound == kqr); 3385 return error; 3386 } 3387 3388 // pthread is supposed to pass KEVENT_FLAG_PARKING here 3389 // which should cause the above call to either: 3390 // - not return 3391 // - return an error 3392 // - return 0 and have unbound properly 3393 assert(uth->uu_kqr_bound == NULL); 3394 } 3395 3396 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0); 3397 3398 thread_sched_call(th, NULL); 3399 thread_will_park_or_terminate(th); 3400 #if CONFIG_WORKLOOP_DEBUG 3401 UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, }); 3402 #endif 3403 3404 workq_lock_spin(wq); 3405 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0); 3406 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value; 3407 workq_select_threadreq_or_park_and_unlock(p, wq, uth, 3408 WQ_SETUP_CLEAR_VOUCHER); 3409 __builtin_unreachable(); 3410 } 3411 3412 /** 3413 * Multiplexed call to interact with the workqueue mechanism 3414 */ 3415 int 3416 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval) 3417 { 3418 int options = uap->options; 3419 int arg2 = uap->affinity; 3420 int arg3 = uap->prio; 3421 struct workqueue *wq = proc_get_wqptr(p); 3422 int error = 0; 3423 3424 if ((p->p_lflag & P_LREGISTER) == 0) { 3425 return EINVAL; 3426 } 3427 3428 switch (options) { 3429 case WQOPS_QUEUE_NEWSPISUPP: { 3430 /* 3431 * arg2 = offset of serialno into dispatch queue 3432 * arg3 = kevent support 3433 */ 3434 int offset = arg2; 3435 if (arg3 & 0x01) { 3436 // If we get here, then userspace has indicated support for kevent delivery. 3437 } 3438 3439 p->p_dispatchqueue_serialno_offset = (uint64_t)offset; 3440 break; 3441 } 3442 case WQOPS_QUEUE_REQTHREADS: { 3443 /* 3444 * arg2 = number of threads to start 3445 * arg3 = priority 3446 */ 3447 error = workq_reqthreads(p, arg2, arg3, false); 3448 break; 3449 } 3450 /* For requesting threads for the cooperative pool */ 3451 case WQOPS_QUEUE_REQTHREADS2: { 3452 /* 3453 * arg2 = number of threads to start 3454 * arg3 = priority 3455 */ 3456 error = workq_reqthreads(p, arg2, arg3, true); 3457 break; 3458 } 3459 case WQOPS_SET_EVENT_MANAGER_PRIORITY: { 3460 /* 3461 * arg2 = priority for the manager thread 3462 * 3463 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set, 3464 * the low bits of the value contains a scheduling priority 3465 * instead of a QOS value 3466 */ 3467 pthread_priority_t pri = arg2; 3468 3469 if (wq == NULL) { 3470 error = EINVAL; 3471 break; 3472 } 3473 3474 /* 3475 * Normalize the incoming priority so that it is ordered numerically. 3476 */ 3477 if (_pthread_priority_has_sched_pri(pri)) { 3478 pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK | 3479 _PTHREAD_PRIORITY_SCHED_PRI_FLAG); 3480 } else { 3481 thread_qos_t qos = _pthread_priority_thread_qos(pri); 3482 int relpri = _pthread_priority_relpri(pri); 3483 if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE || 3484 qos == THREAD_QOS_UNSPECIFIED) { 3485 error = EINVAL; 3486 break; 3487 } 3488 pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK; 3489 } 3490 3491 /* 3492 * If userspace passes a scheduling priority, that wins over any QoS. 3493 * Userspace should takes care not to lower the priority this way. 3494 */ 3495 workq_lock_spin(wq); 3496 if (wq->wq_event_manager_priority < (uint32_t)pri) { 3497 wq->wq_event_manager_priority = (uint32_t)pri; 3498 } 3499 workq_unlock(wq); 3500 break; 3501 } 3502 case WQOPS_THREAD_KEVENT_RETURN: 3503 case WQOPS_THREAD_WORKLOOP_RETURN: 3504 case WQOPS_THREAD_RETURN: { 3505 error = workq_thread_return(p, uap, wq); 3506 break; 3507 } 3508 3509 case WQOPS_SHOULD_NARROW: { 3510 /* 3511 * arg2 = priority to test 3512 * arg3 = unused 3513 */ 3514 thread_t th = current_thread(); 3515 struct uthread *uth = get_bsdthread_info(th); 3516 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) || 3517 (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) { 3518 error = EINVAL; 3519 break; 3520 } 3521 3522 thread_qos_t qos = _pthread_priority_thread_qos(arg2); 3523 if (qos == THREAD_QOS_UNSPECIFIED) { 3524 error = EINVAL; 3525 break; 3526 } 3527 workq_lock_spin(wq); 3528 bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false); 3529 workq_unlock(wq); 3530 3531 *retval = should_narrow; 3532 break; 3533 } 3534 case WQOPS_SETUP_DISPATCH: { 3535 /* 3536 * item = pointer to workq_dispatch_config structure 3537 * arg2 = sizeof(item) 3538 */ 3539 struct workq_dispatch_config cfg; 3540 bzero(&cfg, sizeof(cfg)); 3541 3542 error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2)); 3543 if (error) { 3544 break; 3545 } 3546 3547 if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS || 3548 cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) { 3549 error = ENOTSUP; 3550 break; 3551 } 3552 3553 /* Load fields from version 1 */ 3554 p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs; 3555 3556 /* Load fields from version 2 */ 3557 if (cfg.wdc_version >= 2) { 3558 p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs; 3559 } 3560 3561 break; 3562 } 3563 default: 3564 error = EINVAL; 3565 break; 3566 } 3567 3568 return error; 3569 } 3570 3571 /* 3572 * We have no work to do, park ourselves on the idle list. 3573 * 3574 * Consumes the workqueue lock and does not return. 3575 */ 3576 __attribute__((noreturn, noinline)) 3577 static void 3578 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth, 3579 uint32_t setup_flags) 3580 { 3581 assert(uth == current_uthread()); 3582 assert(uth->uu_kqr_bound == NULL); 3583 workq_push_idle_thread(p, wq, uth, setup_flags); // may not return 3584 3585 workq_thread_reset_cpupercent(NULL, uth); 3586 3587 #if CONFIG_PREADOPT_TG 3588 /* Clear the preadoption thread group on the thread. 3589 * 3590 * Case 1: 3591 * Creator thread which never picked up a thread request. We set a 3592 * preadoption thread group on creator threads but if it never picked 3593 * up a thread request and didn't go to userspace, then the thread will 3594 * park with a preadoption thread group but no explicitly adopted 3595 * voucher or work interval. 3596 * 3597 * We drop the preadoption thread group here before proceeding to park. 3598 * Note - we may get preempted when we drop the workq lock below. 3599 * 3600 * Case 2: 3601 * Thread picked up a thread request and bound to it and returned back 3602 * from userspace and is parking. At this point, preadoption thread 3603 * group should be NULL since the thread has unbound from the thread 3604 * request. So this operation should be a no-op. 3605 */ 3606 thread_set_preadopt_thread_group(get_machthread(uth), NULL); 3607 #endif 3608 3609 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) && 3610 !(uth->uu_workq_flags & UT_WORKQ_DYING)) { 3611 workq_unlock(wq); 3612 3613 /* 3614 * workq_push_idle_thread() will unset `has_stack` 3615 * if it wants us to free the stack before parking. 3616 */ 3617 if (!uth->uu_save.uus_workq_park_data.has_stack) { 3618 pthread_functions->workq_markfree_threadstack(p, 3619 get_machthread(uth), get_task_map(proc_task(p)), 3620 uth->uu_workq_stackaddr); 3621 } 3622 3623 /* 3624 * When we remove the voucher from the thread, we may lose our importance 3625 * causing us to get preempted, so we do this after putting the thread on 3626 * the idle list. Then, when we get our importance back we'll be able to 3627 * use this thread from e.g. the kevent call out to deliver a boosting 3628 * message. 3629 * 3630 * Note that setting the voucher to NULL will not clear the preadoption 3631 * thread since this thread could have become the creator again and 3632 * perhaps acquired a preadoption thread group. 3633 */ 3634 __assert_only kern_return_t kr; 3635 kr = thread_set_voucher_name(MACH_PORT_NULL); 3636 assert(kr == KERN_SUCCESS); 3637 3638 workq_lock_spin(wq); 3639 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP; 3640 setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER; 3641 } 3642 3643 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0); 3644 3645 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) { 3646 /* 3647 * While we'd dropped the lock to unset our voucher, someone came 3648 * around and made us runnable. But because we weren't waiting on the 3649 * event their thread_wakeup() was ineffectual. To correct for that, 3650 * we just run the continuation ourselves. 3651 */ 3652 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags); 3653 __builtin_unreachable(); 3654 } 3655 3656 if (uth->uu_workq_flags & UT_WORKQ_DYING) { 3657 workq_unpark_for_death_and_unlock(p, wq, uth, 3658 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags); 3659 __builtin_unreachable(); 3660 } 3661 3662 /* Disarm the workqueue quantum since the thread is now idle */ 3663 thread_disarm_workqueue_quantum(get_machthread(uth)); 3664 3665 thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue); 3666 assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE); 3667 workq_unlock(wq); 3668 thread_block(workq_unpark_continue); 3669 __builtin_unreachable(); 3670 } 3671 3672 static inline bool 3673 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth) 3674 { 3675 /* 3676 * There's an event manager request and either: 3677 * - no event manager currently running 3678 * - we are re-using the event manager 3679 */ 3680 return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 || 3681 (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER); 3682 } 3683 3684 static uint32_t 3685 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos, 3686 struct uthread *uth, bool may_start_timer) 3687 { 3688 assert(at_qos != WORKQ_THREAD_QOS_MANAGER); 3689 uint32_t count = 0; 3690 3691 uint32_t max_count = wq->wq_constrained_threads_scheduled; 3692 if (uth && workq_thread_is_nonovercommit(uth)) { 3693 /* 3694 * don't count the current thread as scheduled 3695 */ 3696 assert(max_count > 0); 3697 max_count--; 3698 } 3699 if (max_count >= wq_max_constrained_threads) { 3700 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1, 3701 wq->wq_constrained_threads_scheduled, 3702 wq_max_constrained_threads); 3703 /* 3704 * we need 1 or more constrained threads to return to the kernel before 3705 * we can dispatch additional work 3706 */ 3707 return 0; 3708 } 3709 max_count -= wq_max_constrained_threads; 3710 3711 /* 3712 * Compute a metric for many how many threads are active. We find the 3713 * highest priority request outstanding and then add up the number of active 3714 * threads in that and all higher-priority buckets. We'll also add any 3715 * "busy" threads which are not currently active but blocked recently enough 3716 * that we can't be sure that they won't be unblocked soon and start 3717 * being active again. 3718 * 3719 * We'll then compare this metric to our max concurrency to decide whether 3720 * to add a new thread. 3721 */ 3722 3723 uint32_t busycount, thactive_count; 3724 3725 thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq), 3726 at_qos, &busycount, NULL); 3727 3728 if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER && 3729 at_qos <= uth->uu_workq_pri.qos_bucket) { 3730 /* 3731 * Don't count this thread as currently active, but only if it's not 3732 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active 3733 * managers. 3734 */ 3735 assert(thactive_count > 0); 3736 thactive_count--; 3737 } 3738 3739 count = wq_max_parallelism[_wq_bucket(at_qos)]; 3740 if (count > thactive_count + busycount) { 3741 count -= thactive_count + busycount; 3742 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2, 3743 thactive_count, busycount); 3744 return MIN(count, max_count); 3745 } else { 3746 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3, 3747 thactive_count, busycount); 3748 } 3749 3750 if (may_start_timer) { 3751 /* 3752 * If this is called from the add timer, we won't have another timer 3753 * fire when the thread exits the "busy" state, so rearm the timer. 3754 */ 3755 workq_schedule_delayed_thread_creation(wq, 0); 3756 } 3757 3758 return 0; 3759 } 3760 3761 static bool 3762 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth, 3763 workq_threadreq_t req) 3764 { 3765 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 3766 return workq_may_start_event_mgr_thread(wq, uth); 3767 } 3768 if (workq_threadreq_is_cooperative(req)) { 3769 return workq_cooperative_allowance(wq, req->tr_qos, uth, true); 3770 } 3771 if (workq_threadreq_is_nonovercommit(req)) { 3772 return workq_constrained_allowance(wq, req->tr_qos, uth, true); 3773 } 3774 3775 return true; 3776 } 3777 3778 /* 3779 * Called from the context of selecting thread requests for threads returning 3780 * from userspace or creator thread 3781 */ 3782 static workq_threadreq_t 3783 workq_cooperative_queue_best_req(struct workqueue *wq, struct uthread *uth) 3784 { 3785 workq_lock_held(wq); 3786 3787 /* 3788 * If the current thread is cooperative, we need to exclude it as part of 3789 * cooperative schedule count since this thread is looking for a new 3790 * request. Change in the schedule count for cooperative pool therefore 3791 * requires us to reeevaluate the next best request for it. 3792 */ 3793 if (uth && workq_thread_is_cooperative(uth)) { 3794 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req); 3795 3796 (void) _wq_cooperative_queue_refresh_best_req_qos(wq); 3797 3798 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req); 3799 } else { 3800 /* 3801 * The old value that was already precomputed should be safe to use - 3802 * add an assert that asserts that the best req QoS doesn't change in 3803 * this case 3804 */ 3805 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false); 3806 } 3807 3808 thread_qos_t qos = wq->wq_cooperative_queue_best_req_qos; 3809 3810 /* There are no eligible requests in the cooperative pool */ 3811 if (qos == THREAD_QOS_UNSPECIFIED) { 3812 return NULL; 3813 } 3814 assert(qos != WORKQ_THREAD_QOS_ABOVEUI); 3815 assert(qos != WORKQ_THREAD_QOS_MANAGER); 3816 3817 uint8_t bucket = _wq_bucket(qos); 3818 assert(!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])); 3819 3820 return STAILQ_FIRST(&wq->wq_cooperative_queue[bucket]); 3821 } 3822 3823 static workq_threadreq_t 3824 workq_threadreq_select_for_creator(struct workqueue *wq) 3825 { 3826 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr; 3827 thread_qos_t qos = THREAD_QOS_UNSPECIFIED; 3828 uint8_t pri = 0; 3829 3830 /* 3831 * Compute the best priority request, and ignore the turnstile for now 3832 */ 3833 3834 req_pri = priority_queue_max(&wq->wq_special_queue, 3835 struct workq_threadreq_s, tr_entry); 3836 if (req_pri) { 3837 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue, 3838 &req_pri->tr_entry); 3839 } 3840 3841 /* 3842 * Handle the manager thread request. The special queue might yield 3843 * a higher priority, but the manager always beats the QoS world. 3844 */ 3845 3846 req_mgr = wq->wq_event_manager_threadreq; 3847 if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) { 3848 uint32_t mgr_pri = wq->wq_event_manager_priority; 3849 3850 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) { 3851 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 3852 } else { 3853 mgr_pri = thread_workq_pri_for_qos( 3854 _pthread_priority_thread_qos(mgr_pri)); 3855 } 3856 3857 return mgr_pri >= pri ? req_mgr : req_pri; 3858 } 3859 3860 /* 3861 * Compute the best QoS Request, and check whether it beats the "pri" one 3862 * 3863 * Start by comparing the overcommit and the cooperative pool 3864 */ 3865 req_qos = priority_queue_max(&wq->wq_overcommit_queue, 3866 struct workq_threadreq_s, tr_entry); 3867 if (req_qos) { 3868 qos = req_qos->tr_qos; 3869 } 3870 3871 req_tmp = workq_cooperative_queue_best_req(wq, NULL); 3872 if (req_tmp && qos <= req_tmp->tr_qos) { 3873 /* 3874 * Cooperative TR is better between overcommit and cooperative. Note 3875 * that if qos is same between overcommit and cooperative, we choose 3876 * cooperative. 3877 * 3878 * Pick cooperative pool if it passes the admissions check 3879 */ 3880 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, NULL, true)) { 3881 req_qos = req_tmp; 3882 qos = req_qos->tr_qos; 3883 } 3884 } 3885 3886 /* 3887 * Compare the best QoS so far - either from overcommit or from cooperative 3888 * pool - and compare it with the constrained pool 3889 */ 3890 req_tmp = priority_queue_max(&wq->wq_constrained_queue, 3891 struct workq_threadreq_s, tr_entry); 3892 3893 if (req_tmp && qos < req_tmp->tr_qos) { 3894 /* 3895 * Constrained pool is best in QoS between overcommit, cooperative 3896 * and constrained. Now check how it fairs against the priority case 3897 */ 3898 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) { 3899 return req_pri; 3900 } 3901 3902 if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) { 3903 /* 3904 * If the constrained thread request is the best one and passes 3905 * the admission check, pick it. 3906 */ 3907 return req_tmp; 3908 } 3909 } 3910 3911 /* 3912 * Compare the best of the QoS world with the priority 3913 */ 3914 if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) { 3915 return req_pri; 3916 } 3917 3918 if (req_qos) { 3919 return req_qos; 3920 } 3921 3922 /* 3923 * If we had no eligible request but we have a turnstile push, 3924 * it must be a non overcommit thread request that failed 3925 * the admission check. 3926 * 3927 * Just fake a BG thread request so that if the push stops the creator 3928 * priority just drops to 4. 3929 */ 3930 if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) { 3931 static struct workq_threadreq_s workq_sync_push_fake_req = { 3932 .tr_qos = THREAD_QOS_BACKGROUND, 3933 }; 3934 3935 return &workq_sync_push_fake_req; 3936 } 3937 3938 return NULL; 3939 } 3940 3941 /* 3942 * Returns true if this caused a change in the schedule counts of the 3943 * cooperative pool 3944 */ 3945 static bool 3946 workq_adjust_cooperative_constrained_schedule_counts(struct workqueue *wq, 3947 struct uthread *uth, thread_qos_t old_thread_qos, workq_tr_flags_t tr_flags) 3948 { 3949 workq_lock_held(wq); 3950 3951 /* 3952 * Row: thread type 3953 * Column: Request type 3954 * 3955 * overcommit non-overcommit cooperative 3956 * overcommit X case 1 case 2 3957 * cooperative case 3 case 4 case 5 3958 * non-overcommit case 6 X case 7 3959 * 3960 * Move the thread to the right bucket depending on what state it currently 3961 * has and what state the thread req it picks, is going to have. 3962 * 3963 * Note that the creator thread is an overcommit thread. 3964 */ 3965 thread_qos_t new_thread_qos = uth->uu_workq_pri.qos_req; 3966 3967 /* 3968 * Anytime a cooperative bucket's schedule count changes, we need to 3969 * potentially refresh the next best QoS for that pool when we determine 3970 * the next request for the creator 3971 */ 3972 bool cooperative_pool_sched_count_changed = false; 3973 3974 if (workq_thread_is_overcommit(uth)) { 3975 if (workq_tr_is_nonovercommit(tr_flags)) { 3976 // Case 1: thread is overcommit, req is non-overcommit 3977 wq->wq_constrained_threads_scheduled++; 3978 } else if (workq_tr_is_cooperative(tr_flags)) { 3979 // Case 2: thread is overcommit, req is cooperative 3980 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos); 3981 cooperative_pool_sched_count_changed = true; 3982 } 3983 } else if (workq_thread_is_cooperative(uth)) { 3984 if (workq_tr_is_overcommit(tr_flags)) { 3985 // Case 3: thread is cooperative, req is overcommit 3986 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos); 3987 } else if (workq_tr_is_nonovercommit(tr_flags)) { 3988 // Case 4: thread is cooperative, req is non-overcommit 3989 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos); 3990 wq->wq_constrained_threads_scheduled++; 3991 } else { 3992 // Case 5: thread is cooperative, req is also cooperative 3993 assert(workq_tr_is_cooperative(tr_flags)); 3994 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos); 3995 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos); 3996 } 3997 cooperative_pool_sched_count_changed = true; 3998 } else { 3999 if (workq_tr_is_overcommit(tr_flags)) { 4000 // Case 6: Thread is non-overcommit, req is overcommit 4001 wq->wq_constrained_threads_scheduled--; 4002 } else if (workq_tr_is_cooperative(tr_flags)) { 4003 // Case 7: Thread is non-overcommit, req is cooperative 4004 wq->wq_constrained_threads_scheduled--; 4005 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos); 4006 cooperative_pool_sched_count_changed = true; 4007 } 4008 } 4009 4010 return cooperative_pool_sched_count_changed; 4011 } 4012 4013 static workq_threadreq_t 4014 workq_threadreq_select(struct workqueue *wq, struct uthread *uth) 4015 { 4016 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr; 4017 uintptr_t proprietor; 4018 thread_qos_t qos = THREAD_QOS_UNSPECIFIED; 4019 uint8_t pri = 0; 4020 4021 if (uth == wq->wq_creator) { 4022 uth = NULL; 4023 } 4024 4025 /* 4026 * Compute the best priority request (special or turnstile) 4027 */ 4028 4029 pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, 4030 &proprietor); 4031 if (pri) { 4032 struct kqworkloop *kqwl = (struct kqworkloop *)proprietor; 4033 req_pri = &kqwl->kqwl_request; 4034 if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) { 4035 panic("Invalid thread request (%p) state %d", 4036 req_pri, req_pri->tr_state); 4037 } 4038 } else { 4039 req_pri = NULL; 4040 } 4041 4042 req_tmp = priority_queue_max(&wq->wq_special_queue, 4043 struct workq_threadreq_s, tr_entry); 4044 if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue, 4045 &req_tmp->tr_entry)) { 4046 req_pri = req_tmp; 4047 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue, 4048 &req_tmp->tr_entry); 4049 } 4050 4051 /* 4052 * Handle the manager thread request. The special queue might yield 4053 * a higher priority, but the manager always beats the QoS world. 4054 */ 4055 4056 req_mgr = wq->wq_event_manager_threadreq; 4057 if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) { 4058 uint32_t mgr_pri = wq->wq_event_manager_priority; 4059 4060 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) { 4061 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 4062 } else { 4063 mgr_pri = thread_workq_pri_for_qos( 4064 _pthread_priority_thread_qos(mgr_pri)); 4065 } 4066 4067 return mgr_pri >= pri ? req_mgr : req_pri; 4068 } 4069 4070 /* 4071 * Compute the best QoS Request, and check whether it beats the "pri" one 4072 */ 4073 4074 req_qos = priority_queue_max(&wq->wq_overcommit_queue, 4075 struct workq_threadreq_s, tr_entry); 4076 if (req_qos) { 4077 qos = req_qos->tr_qos; 4078 } 4079 4080 req_tmp = workq_cooperative_queue_best_req(wq, uth); 4081 if (req_tmp && qos <= req_tmp->tr_qos) { 4082 /* 4083 * Cooperative TR is better between overcommit and cooperative. Note 4084 * that if qos is same between overcommit and cooperative, we choose 4085 * cooperative. 4086 * 4087 * Pick cooperative pool if it passes the admissions check 4088 */ 4089 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, uth, true)) { 4090 req_qos = req_tmp; 4091 qos = req_qos->tr_qos; 4092 } 4093 } 4094 4095 /* 4096 * Compare the best QoS so far - either from overcommit or from cooperative 4097 * pool - and compare it with the constrained pool 4098 */ 4099 req_tmp = priority_queue_max(&wq->wq_constrained_queue, 4100 struct workq_threadreq_s, tr_entry); 4101 4102 if (req_tmp && qos < req_tmp->tr_qos) { 4103 /* 4104 * Constrained pool is best in QoS between overcommit, cooperative 4105 * and constrained. Now check how it fairs against the priority case 4106 */ 4107 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) { 4108 return req_pri; 4109 } 4110 4111 if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) { 4112 /* 4113 * If the constrained thread request is the best one and passes 4114 * the admission check, pick it. 4115 */ 4116 return req_tmp; 4117 } 4118 } 4119 4120 if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) { 4121 return req_pri; 4122 } 4123 4124 return req_qos; 4125 } 4126 4127 /* 4128 * The creator is an anonymous thread that is counted as scheduled, 4129 * but otherwise without its scheduler callback set or tracked as active 4130 * that is used to make other threads. 4131 * 4132 * When more requests are added or an existing one is hurried along, 4133 * a creator is elected and setup, or the existing one overridden accordingly. 4134 * 4135 * While this creator is in flight, because no request has been dequeued, 4136 * already running threads have a chance at stealing thread requests avoiding 4137 * useless context switches, and the creator once scheduled may not find any 4138 * work to do and will then just park again. 4139 * 4140 * The creator serves the dual purpose of informing the scheduler of work that 4141 * hasn't be materialized as threads yet, and also as a natural pacing mechanism 4142 * for thread creation. 4143 * 4144 * By being anonymous (and not bound to anything) it means that thread requests 4145 * can be stolen from this creator by threads already on core yielding more 4146 * efficient scheduling and reduced context switches. 4147 */ 4148 static void 4149 workq_schedule_creator(proc_t p, struct workqueue *wq, 4150 workq_kern_threadreq_flags_t flags) 4151 { 4152 workq_threadreq_t req; 4153 struct uthread *uth; 4154 bool needs_wakeup; 4155 4156 workq_lock_held(wq); 4157 assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0); 4158 4159 again: 4160 uth = wq->wq_creator; 4161 4162 if (!wq->wq_reqcount) { 4163 /* 4164 * There is no thread request left. 4165 * 4166 * If there is a creator, leave everything in place, so that it cleans 4167 * up itself in workq_push_idle_thread(). 4168 * 4169 * Else, make sure the turnstile state is reset to no inheritor. 4170 */ 4171 if (uth == NULL) { 4172 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 4173 } 4174 return; 4175 } 4176 4177 req = workq_threadreq_select_for_creator(wq); 4178 if (req == NULL) { 4179 /* 4180 * There isn't a thread request that passes the admission check. 4181 * 4182 * If there is a creator, do not touch anything, the creator will sort 4183 * it out when it runs. 4184 * 4185 * Else, set the inheritor to "WORKQ" so that the turnstile propagation 4186 * code calls us if anything changes. 4187 */ 4188 if (uth == NULL) { 4189 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ); 4190 } 4191 return; 4192 } 4193 4194 4195 if (uth) { 4196 /* 4197 * We need to maybe override the creator we already have 4198 */ 4199 if (workq_thread_needs_priority_change(req, uth)) { 4200 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE, 4201 wq, 1, uthread_tid(uth), req->tr_qos); 4202 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 4203 } 4204 assert(wq->wq_inheritor == get_machthread(uth)); 4205 } else if (wq->wq_thidlecount) { 4206 /* 4207 * We need to unpark a creator thread 4208 */ 4209 wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT, 4210 &needs_wakeup); 4211 /* Always reset the priorities on the newly chosen creator */ 4212 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 4213 workq_turnstile_update_inheritor(wq, get_machthread(uth), 4214 TURNSTILE_INHERITOR_THREAD); 4215 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE, 4216 wq, 2, uthread_tid(uth), req->tr_qos); 4217 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled; 4218 uth->uu_save.uus_workq_park_data.yields = 0; 4219 if (needs_wakeup) { 4220 workq_thread_wakeup(uth); 4221 } 4222 } else { 4223 /* 4224 * We need to allocate a thread... 4225 */ 4226 if (__improbable(wq->wq_nthreads >= wq_max_threads)) { 4227 /* out of threads, just go away */ 4228 flags = WORKQ_THREADREQ_NONE; 4229 } else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) { 4230 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ); 4231 } else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) { 4232 /* This can drop the workqueue lock, and take it again */ 4233 workq_schedule_immediate_thread_creation(wq); 4234 } else if (workq_add_new_idle_thread(p, wq)) { 4235 goto again; 4236 } else { 4237 workq_schedule_delayed_thread_creation(wq, 0); 4238 } 4239 4240 /* 4241 * If the current thread is the inheritor: 4242 * 4243 * If we set the AST, then the thread will stay the inheritor until 4244 * either the AST calls workq_kern_threadreq_redrive(), or it parks 4245 * and calls workq_push_idle_thread(). 4246 * 4247 * Else, the responsibility of the thread creation is with a thread-call 4248 * and we need to clear the inheritor. 4249 */ 4250 if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 && 4251 wq->wq_inheritor == current_thread()) { 4252 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 4253 } 4254 } 4255 } 4256 4257 /** 4258 * Same as workq_unpark_select_threadreq_or_park_and_unlock, 4259 * but do not allow early binds. 4260 * 4261 * Called with the base pri frozen, will unfreeze it. 4262 */ 4263 __attribute__((noreturn, noinline)) 4264 static void 4265 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 4266 struct uthread *uth, uint32_t setup_flags) 4267 { 4268 workq_threadreq_t req = NULL; 4269 bool is_creator = (wq->wq_creator == uth); 4270 bool schedule_creator = false; 4271 4272 if (__improbable(_wq_exiting(wq))) { 4273 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0); 4274 goto park; 4275 } 4276 4277 if (wq->wq_reqcount == 0) { 4278 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0); 4279 goto park; 4280 } 4281 4282 req = workq_threadreq_select(wq, uth); 4283 if (__improbable(req == NULL)) { 4284 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0); 4285 goto park; 4286 } 4287 4288 struct uu_workq_policy old_pri = uth->uu_workq_pri; 4289 uint8_t tr_flags = req->tr_flags; 4290 struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req); 4291 4292 /* 4293 * Attempt to setup ourselves as the new thing to run, moving all priority 4294 * pushes to ourselves. 4295 * 4296 * If the current thread is the creator, then the fact that we are presently 4297 * running is proof that we'll do something useful, so keep going. 4298 * 4299 * For other cases, peek at the AST to know whether the scheduler wants 4300 * to preempt us, if yes, park instead, and move the thread request 4301 * turnstile back to the workqueue. 4302 */ 4303 if (req_ts) { 4304 workq_perform_turnstile_operation_locked(wq, ^{ 4305 turnstile_update_inheritor(req_ts, get_machthread(uth), 4306 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD); 4307 turnstile_update_inheritor_complete(req_ts, 4308 TURNSTILE_INTERLOCK_HELD); 4309 }); 4310 } 4311 4312 /* accounting changes of aggregate thscheduled_count and thactive which has 4313 * to be paired with the workq_thread_reset_pri below so that we have 4314 * uth->uu_workq_pri match with thactive. 4315 * 4316 * This is undone when the thread parks */ 4317 if (is_creator) { 4318 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0, 4319 uth->uu_save.uus_workq_park_data.yields); 4320 wq->wq_creator = NULL; 4321 _wq_thactive_inc(wq, req->tr_qos); 4322 wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++; 4323 } else if (old_pri.qos_bucket != req->tr_qos) { 4324 _wq_thactive_move(wq, old_pri.qos_bucket, req->tr_qos); 4325 } 4326 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 4327 4328 /* 4329 * Make relevant accounting changes for pool specific counts. 4330 * 4331 * The schedule counts changing can affect what the next best request 4332 * for cooperative thread pool is if this request is dequeued. 4333 */ 4334 bool cooperative_sched_count_changed = 4335 workq_adjust_cooperative_constrained_schedule_counts(wq, uth, 4336 old_pri.qos_req, tr_flags); 4337 4338 if (workq_tr_is_overcommit(tr_flags)) { 4339 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT); 4340 } else if (workq_tr_is_cooperative(tr_flags)) { 4341 workq_thread_set_type(uth, UT_WORKQ_COOPERATIVE); 4342 } else { 4343 workq_thread_set_type(uth, 0); 4344 } 4345 4346 if (__improbable(thread_unfreeze_base_pri(get_machthread(uth)) && !is_creator)) { 4347 if (req_ts) { 4348 workq_perform_turnstile_operation_locked(wq, ^{ 4349 turnstile_update_inheritor(req_ts, wq->wq_turnstile, 4350 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE); 4351 turnstile_update_inheritor_complete(req_ts, 4352 TURNSTILE_INTERLOCK_HELD); 4353 }); 4354 } 4355 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0); 4356 goto park_thawed; 4357 } 4358 4359 /* 4360 * We passed all checks, dequeue the request, bind to it, and set it up 4361 * to return to user. 4362 */ 4363 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 4364 workq_trace_req_id(req), tr_flags, 0); 4365 wq->wq_fulfilled++; 4366 schedule_creator = workq_threadreq_dequeue(wq, req, 4367 cooperative_sched_count_changed); 4368 4369 workq_thread_reset_cpupercent(req, uth); 4370 4371 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) { 4372 kqueue_threadreq_bind_prepost(p, req, uth); 4373 req = NULL; 4374 } else if (req->tr_count > 0) { 4375 req = NULL; 4376 } 4377 4378 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 4379 uth->uu_workq_flags ^= UT_WORKQ_NEW; 4380 setup_flags |= WQ_SETUP_FIRST_USE; 4381 } 4382 4383 /* If one of the following is true, call workq_schedule_creator (which also 4384 * adjusts priority of existing creator): 4385 * 4386 * - We are the creator currently so the wq may need a new creator 4387 * - The request we're binding to is the highest priority one, existing 4388 * creator's priority might need to be adjusted to reflect the next 4389 * highest TR 4390 */ 4391 if (is_creator || schedule_creator) { 4392 /* This can drop the workqueue lock, and take it again */ 4393 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 4394 } 4395 4396 workq_unlock(wq); 4397 4398 if (req) { 4399 zfree(workq_zone_threadreq, req); 4400 } 4401 4402 /* 4403 * Run Thread, Run! 4404 */ 4405 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI; 4406 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 4407 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER; 4408 } else if (workq_tr_is_overcommit(tr_flags)) { 4409 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 4410 } else if (workq_tr_is_cooperative(tr_flags)) { 4411 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE; 4412 } 4413 if (tr_flags & WORKQ_TR_FLAG_KEVENT) { 4414 upcall_flags |= WQ_FLAG_THREAD_KEVENT; 4415 assert((upcall_flags & WQ_FLAG_THREAD_COOPERATIVE) == 0); 4416 } 4417 4418 if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 4419 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT; 4420 } 4421 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags; 4422 4423 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) { 4424 kqueue_threadreq_bind_commit(p, get_machthread(uth)); 4425 } else { 4426 #if CONFIG_PREADOPT_TG 4427 /* 4428 * The thread may have a preadopt thread group on it already because it 4429 * got tagged with it as a creator thread. So we need to make sure to 4430 * clear that since we don't have preadoption for anonymous thread 4431 * requests 4432 */ 4433 thread_set_preadopt_thread_group(get_machthread(uth), NULL); 4434 #endif 4435 } 4436 4437 workq_setup_and_run(p, uth, setup_flags); 4438 __builtin_unreachable(); 4439 4440 park: 4441 thread_unfreeze_base_pri(get_machthread(uth)); 4442 park_thawed: 4443 workq_park_and_unlock(p, wq, uth, setup_flags); 4444 } 4445 4446 /** 4447 * Runs a thread request on a thread 4448 * 4449 * - if thread is THREAD_NULL, will find a thread and run the request there. 4450 * Otherwise, the thread must be the current thread. 4451 * 4452 * - if req is NULL, will find the highest priority request and run that. If 4453 * it is not NULL, it must be a threadreq object in state NEW. If it can not 4454 * be run immediately, it will be enqueued and moved to state QUEUED. 4455 * 4456 * Either way, the thread request object serviced will be moved to state 4457 * BINDING and attached to the uthread. 4458 * 4459 * Should be called with the workqueue lock held. Will drop it. 4460 * Should be called with the base pri not frozen. 4461 */ 4462 __attribute__((noreturn, noinline)) 4463 static void 4464 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 4465 struct uthread *uth, uint32_t setup_flags) 4466 { 4467 if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) { 4468 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 4469 setup_flags |= WQ_SETUP_FIRST_USE; 4470 } 4471 uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND); 4472 /* 4473 * This pointer is possibly freed and only used for tracing purposes. 4474 */ 4475 workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request; 4476 workq_unlock(wq); 4477 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 4478 VM_KERNEL_ADDRHIDE(req), 0, 0); 4479 (void)req; 4480 4481 workq_setup_and_run(p, uth, setup_flags); 4482 __builtin_unreachable(); 4483 } 4484 4485 thread_freeze_base_pri(get_machthread(uth)); 4486 workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags); 4487 } 4488 4489 static bool 4490 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth) 4491 { 4492 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri); 4493 4494 if (qos >= THREAD_QOS_USER_INTERACTIVE) { 4495 return false; 4496 } 4497 4498 uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot; 4499 if (wq->wq_fulfilled == snapshot) { 4500 return false; 4501 } 4502 4503 uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)]; 4504 if (wq->wq_fulfilled - snapshot > conc) { 4505 /* we fulfilled more than NCPU requests since being dispatched */ 4506 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1, 4507 wq->wq_fulfilled, snapshot); 4508 return true; 4509 } 4510 4511 for (uint8_t i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) { 4512 cnt += wq->wq_thscheduled_count[i]; 4513 } 4514 if (conc <= cnt) { 4515 /* We fulfilled requests and have more than NCPU scheduled threads */ 4516 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2, 4517 wq->wq_fulfilled, snapshot); 4518 return true; 4519 } 4520 4521 return false; 4522 } 4523 4524 /** 4525 * parked thread wakes up 4526 */ 4527 __attribute__((noreturn, noinline)) 4528 static void 4529 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused) 4530 { 4531 thread_t th = current_thread(); 4532 struct uthread *uth = get_bsdthread_info(th); 4533 proc_t p = current_proc(); 4534 struct workqueue *wq = proc_get_wqptr_fast(p); 4535 4536 workq_lock_spin(wq); 4537 4538 if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) { 4539 /* 4540 * If the number of threads we have out are able to keep up with the 4541 * demand, then we should avoid sending this creator thread to 4542 * userspace. 4543 */ 4544 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled; 4545 uth->uu_save.uus_workq_park_data.yields++; 4546 workq_unlock(wq); 4547 thread_yield_with_continuation(workq_unpark_continue, NULL); 4548 __builtin_unreachable(); 4549 } 4550 4551 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) { 4552 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE); 4553 __builtin_unreachable(); 4554 } 4555 4556 if (__probable(wr == THREAD_AWAKENED)) { 4557 /* 4558 * We were set running, but for the purposes of dying. 4559 */ 4560 assert(uth->uu_workq_flags & UT_WORKQ_DYING); 4561 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0); 4562 } else { 4563 /* 4564 * workaround for <rdar://problem/38647347>, 4565 * in case we do hit userspace, make sure calling 4566 * workq_thread_terminate() does the right thing here, 4567 * and if we never call it, that workq_exit() will too because it sees 4568 * this thread on the runlist. 4569 */ 4570 assert(wr == THREAD_INTERRUPTED); 4571 wq->wq_thdying_count++; 4572 uth->uu_workq_flags |= UT_WORKQ_DYING; 4573 } 4574 4575 workq_unpark_for_death_and_unlock(p, wq, uth, 4576 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE); 4577 __builtin_unreachable(); 4578 } 4579 4580 __attribute__((noreturn, noinline)) 4581 static void 4582 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags) 4583 { 4584 thread_t th = get_machthread(uth); 4585 vm_map_t vmap = get_task_map(proc_task(p)); 4586 4587 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) { 4588 /* 4589 * For preemption reasons, we want to reset the voucher as late as 4590 * possible, so we do it in two places: 4591 * - Just before parking (i.e. in workq_park_and_unlock()) 4592 * - Prior to doing the setup for the next workitem (i.e. here) 4593 * 4594 * Those two places are sufficient to ensure we always reset it before 4595 * it goes back out to user space, but be careful to not break that 4596 * guarantee. 4597 * 4598 * Note that setting the voucher to NULL will not clear the preadoption 4599 * thread group on this thread 4600 */ 4601 __assert_only kern_return_t kr; 4602 kr = thread_set_voucher_name(MACH_PORT_NULL); 4603 assert(kr == KERN_SUCCESS); 4604 } 4605 4606 uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags; 4607 if (!(setup_flags & WQ_SETUP_FIRST_USE)) { 4608 upcall_flags |= WQ_FLAG_THREAD_REUSE; 4609 } 4610 4611 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) { 4612 /* 4613 * For threads that have an outside-of-QoS thread priority, indicate 4614 * to userspace that setting QoS should only affect the TSD and not 4615 * change QOS in the kernel. 4616 */ 4617 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS; 4618 } else { 4619 /* 4620 * Put the QoS class value into the lower bits of the reuse_thread 4621 * register, this is where the thread priority used to be stored 4622 * anyway. 4623 */ 4624 upcall_flags |= uth->uu_save.uus_workq_park_data.qos | 4625 WQ_FLAG_THREAD_PRIO_QOS; 4626 } 4627 4628 if (uth->uu_workq_thport == MACH_PORT_NULL) { 4629 /* convert_thread_to_port_pinned() consumes a reference */ 4630 thread_reference(th); 4631 /* Convert to immovable/pinned thread port, but port is not pinned yet */ 4632 ipc_port_t port = convert_thread_to_port_pinned(th); 4633 /* Atomically, pin and copy out the port */ 4634 uth->uu_workq_thport = ipc_port_copyout_send_pinned(port, get_task_ipcspace(proc_task(p))); 4635 } 4636 4637 /* Thread has been set up to run, arm its next workqueue quantum or disarm 4638 * if it is no longer supporting that */ 4639 if (thread_supports_cooperative_workqueue(th)) { 4640 thread_arm_workqueue_quantum(th); 4641 } else { 4642 thread_disarm_workqueue_quantum(th); 4643 } 4644 4645 /* 4646 * Call out to pthread, this sets up the thread, pulls in kevent structs 4647 * onto the stack, sets up the thread state and then returns to userspace. 4648 */ 4649 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START, 4650 proc_get_wqptr_fast(p), 0, 0, 0); 4651 4652 if (workq_thread_is_cooperative(uth)) { 4653 thread_sched_call(th, NULL); 4654 } else { 4655 thread_sched_call(th, workq_sched_callback); 4656 } 4657 4658 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr, 4659 uth->uu_workq_thport, 0, setup_flags, upcall_flags); 4660 4661 __builtin_unreachable(); 4662 } 4663 4664 #pragma mark misc 4665 4666 int 4667 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo) 4668 { 4669 struct workqueue *wq = proc_get_wqptr(p); 4670 int error = 0; 4671 int activecount; 4672 4673 if (wq == NULL) { 4674 return EINVAL; 4675 } 4676 4677 /* 4678 * This is sometimes called from interrupt context by the kperf sampler. 4679 * In that case, it's not safe to spin trying to take the lock since we 4680 * might already hold it. So, we just try-lock it and error out if it's 4681 * already held. Since this is just a debugging aid, and all our callers 4682 * are able to handle an error, that's fine. 4683 */ 4684 bool locked = workq_lock_try(wq); 4685 if (!locked) { 4686 return EBUSY; 4687 } 4688 4689 wq_thactive_t act = _wq_thactive(wq); 4690 activecount = _wq_thactive_aggregate_downto_qos(wq, act, 4691 WORKQ_THREAD_QOS_MIN, NULL, NULL); 4692 if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) { 4693 activecount++; 4694 } 4695 pwqinfo->pwq_nthreads = wq->wq_nthreads; 4696 pwqinfo->pwq_runthreads = activecount; 4697 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount; 4698 pwqinfo->pwq_state = 0; 4699 4700 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) { 4701 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT; 4702 } 4703 4704 if (wq->wq_nthreads >= wq_max_threads) { 4705 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT; 4706 } 4707 4708 workq_unlock(wq); 4709 return error; 4710 } 4711 4712 boolean_t 4713 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total, 4714 boolean_t *exceeded_constrained) 4715 { 4716 proc_t p = v; 4717 struct proc_workqueueinfo pwqinfo; 4718 int err; 4719 4720 assert(p != NULL); 4721 assert(exceeded_total != NULL); 4722 assert(exceeded_constrained != NULL); 4723 4724 err = fill_procworkqueue(p, &pwqinfo); 4725 if (err) { 4726 return FALSE; 4727 } 4728 if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) { 4729 return FALSE; 4730 } 4731 4732 *exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT); 4733 *exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT); 4734 4735 return TRUE; 4736 } 4737 4738 uint32_t 4739 workqueue_get_pwq_state_kdp(void * v) 4740 { 4741 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) == 4742 kTaskWqExceededConstrainedThreadLimit); 4743 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) == 4744 kTaskWqExceededTotalThreadLimit); 4745 static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable); 4746 static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT | 4747 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7); 4748 4749 if (v == NULL) { 4750 return 0; 4751 } 4752 4753 proc_t p = v; 4754 struct workqueue *wq = proc_get_wqptr(p); 4755 4756 if (wq == NULL || workq_lock_is_acquired_kdp(wq)) { 4757 return 0; 4758 } 4759 4760 uint32_t pwq_state = WQ_FLAGS_AVAILABLE; 4761 4762 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) { 4763 pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT; 4764 } 4765 4766 if (wq->wq_nthreads >= wq_max_threads) { 4767 pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT; 4768 } 4769 4770 return pwq_state; 4771 } 4772 4773 void 4774 workq_init(void) 4775 { 4776 clock_interval_to_absolutetime_interval(wq_stalled_window.usecs, 4777 NSEC_PER_USEC, &wq_stalled_window.abstime); 4778 clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs, 4779 NSEC_PER_USEC, &wq_reduce_pool_window.abstime); 4780 clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs, 4781 NSEC_PER_USEC, &wq_max_timer_interval.abstime); 4782 4783 thread_deallocate_daemon_register_queue(&workq_deallocate_queue, 4784 workq_deallocate_queue_invoke); 4785 } 4786