xref: /xnu-11215/bsd/pthread/pthread_workqueue.c (revision aca3beaa)
1 /*
2  * Copyright (c) 2000-2020 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
29 
30 #include <sys/cdefs.h>
31 
32 #include <kern/assert.h>
33 #include <kern/ast.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h>    /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/thread_group.h>
43 #include <kern/zalloc.h>
44 #include <mach/kern_return.h>
45 #include <mach/mach_param.h>
46 #include <mach/mach_port.h>
47 #include <mach/mach_types.h>
48 #include <mach/mach_vm.h>
49 #include <mach/sync_policy.h>
50 #include <mach/task.h>
51 #include <mach/thread_act.h> /* for thread_resume */
52 #include <mach/thread_policy.h>
53 #include <mach/thread_status.h>
54 #include <mach/vm_prot.h>
55 #include <mach/vm_statistics.h>
56 #include <machine/atomic.h>
57 #include <machine/machine_routines.h>
58 #include <machine/smp.h>
59 #include <vm/vm_map.h>
60 #include <vm/vm_protos.h>
61 
62 #include <sys/eventvar.h>
63 #include <sys/kdebug.h>
64 #include <sys/kernel.h>
65 #include <sys/lock.h>
66 #include <sys/param.h>
67 #include <sys/proc_info.h>      /* for fill_procworkqueue */
68 #include <sys/proc_internal.h>
69 #include <sys/pthread_shims.h>
70 #include <sys/resourcevar.h>
71 #include <sys/signalvar.h>
72 #include <sys/sysctl.h>
73 #include <sys/sysproto.h>
74 #include <sys/systm.h>
75 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
76 
77 #include <pthread/bsdthread_private.h>
78 #include <pthread/workqueue_syscalls.h>
79 #include <pthread/workqueue_internal.h>
80 #include <pthread/workqueue_trace.h>
81 
82 #include <os/log.h>
83 
84 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2;
85 static void workq_schedule_creator(proc_t p, struct workqueue *wq,
86     workq_kern_threadreq_flags_t flags);
87 
88 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
89     workq_threadreq_t req);
90 
91 static uint32_t workq_constrained_allowance(struct workqueue *wq,
92     thread_qos_t at_qos, struct uthread *uth, bool may_start_timer);
93 
94 static bool _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq);
95 
96 static bool workq_thread_is_busy(uint64_t cur_ts,
97     _Atomic uint64_t *lastblocked_tsp);
98 
99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
100 
101 static bool
102 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags);
103 
104 static inline void
105 workq_lock_spin(struct workqueue *wq);
106 
107 static inline void
108 workq_unlock(struct workqueue *wq);
109 
110 #pragma mark globals
111 
112 struct workq_usec_var {
113 	uint32_t usecs;
114 	uint64_t abstime;
115 };
116 
117 #define WORKQ_SYSCTL_USECS(var, init) \
118 	        static struct workq_usec_var var = { .usecs = init }; \
119 	        SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
120 	                        CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
121 	                        workq_sysctl_handle_usecs, "I", "")
122 
123 static LCK_GRP_DECLARE(workq_lck_grp, "workq");
124 os_refgrp_decl(static, workq_refgrp, "workq", NULL);
125 
126 static ZONE_DEFINE(workq_zone_workqueue, "workq.wq",
127     sizeof(struct workqueue), ZC_NONE);
128 static ZONE_DEFINE(workq_zone_threadreq, "workq.threadreq",
129     sizeof(struct workq_threadreq_s), ZC_CACHING);
130 
131 static struct mpsc_daemon_queue workq_deallocate_queue;
132 
133 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS);
134 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
135 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS);
136 static uint32_t wq_max_threads              = WORKQUEUE_MAXTHREADS;
137 static uint32_t wq_max_constrained_threads  = WORKQUEUE_MAXTHREADS / 8;
138 static uint32_t wq_init_constrained_limit   = 1;
139 static uint16_t wq_death_max_load;
140 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS];
141 
142 /*
143  * This is not a hard limit but the max size we want to aim to hit across the
144  * entire cooperative pool. We can oversubscribe the pool due to non-cooperative
145  * workers and the max we will oversubscribe the pool by, is a total of
146  * wq_max_cooperative_threads * WORKQ_NUM_QOS_BUCKETS.
147  */
148 static uint32_t wq_max_cooperative_threads;
149 
150 static inline uint32_t
151 wq_cooperative_queue_max_size(struct workqueue *wq)
152 {
153 	return wq->wq_cooperative_queue_has_limited_max_size ? 1 : wq_max_cooperative_threads;
154 }
155 
156 #pragma mark sysctls
157 
158 static int
159 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
160 {
161 #pragma unused(arg2)
162 	struct workq_usec_var *v = arg1;
163 	int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
164 	if (error || !req->newptr) {
165 		return error;
166 	}
167 	clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
168 	    &v->abstime);
169 	return 0;
170 }
171 
172 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
173     &wq_max_threads, 0, "");
174 
175 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
176     &wq_max_constrained_threads, 0, "");
177 
178 static int
179 wq_limit_cooperative_threads_for_proc SYSCTL_HANDLER_ARGS
180 {
181 #pragma unused(arg1, arg2, oidp)
182 	int input_pool_size = 0;
183 	int changed;
184 	int error = 0;
185 
186 	error = sysctl_io_number(req, 0, sizeof(int), &input_pool_size, &changed);
187 	if (error || !changed) {
188 		return error;
189 	}
190 
191 #define WQ_COOPERATIVE_POOL_SIZE_DEFAULT 0
192 #define WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS -1
193 /* Not available currently, but sysctl interface is designed to allow these
194  * extra parameters:
195  *		WQ_COOPERATIVE_POOL_SIZE_STRICT : -2 (across all bucket)
196  *		WQ_COOPERATIVE_POOL_SIZE_CUSTOM : [1, 512]
197  */
198 
199 	if (input_pool_size != WQ_COOPERATIVE_POOL_SIZE_DEFAULT
200 	    && input_pool_size != WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS) {
201 		error = EINVAL;
202 		goto out;
203 	}
204 
205 	proc_t p = req->p;
206 	struct workqueue *wq = proc_get_wqptr(p);
207 
208 	if (wq != NULL) {
209 		workq_lock_spin(wq);
210 		if (wq->wq_reqcount > 0 || wq->wq_nthreads > 0) {
211 			// Hackily enforce that the workqueue is still new (no requests or
212 			// threads)
213 			error = ENOTSUP;
214 		} else {
215 			wq->wq_cooperative_queue_has_limited_max_size = (input_pool_size == WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS);
216 		}
217 		workq_unlock(wq);
218 	} else {
219 		/* This process has no workqueue, calling this syctl makes no sense */
220 		return ENOTSUP;
221 	}
222 
223 out:
224 	return error;
225 }
226 
227 SYSCTL_PROC(_kern, OID_AUTO, wq_limit_cooperative_threads,
228     CTLFLAG_ANYBODY | CTLFLAG_MASKED | CTLFLAG_WR | CTLFLAG_LOCKED | CTLTYPE_INT, 0, 0,
229     wq_limit_cooperative_threads_for_proc,
230     "I", "Modify the max pool size of the cooperative pool");
231 
232 #pragma mark p_wqptr
233 
234 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
235 
236 static struct workqueue *
237 proc_get_wqptr_fast(struct proc *p)
238 {
239 	return os_atomic_load(&p->p_wqptr, relaxed);
240 }
241 
242 struct workqueue *
243 proc_get_wqptr(struct proc *p)
244 {
245 	struct workqueue *wq = proc_get_wqptr_fast(p);
246 	return wq == WQPTR_IS_INITING_VALUE ? NULL : wq;
247 }
248 
249 static void
250 proc_set_wqptr(struct proc *p, struct workqueue *wq)
251 {
252 	wq = os_atomic_xchg(&p->p_wqptr, wq, release);
253 	if (wq == WQPTR_IS_INITING_VALUE) {
254 		proc_lock(p);
255 		thread_wakeup(&p->p_wqptr);
256 		proc_unlock(p);
257 	}
258 }
259 
260 static bool
261 proc_init_wqptr_or_wait(struct proc *p)
262 {
263 	struct workqueue *wq;
264 
265 	proc_lock(p);
266 	wq = os_atomic_load(&p->p_wqptr, relaxed);
267 
268 	if (wq == NULL) {
269 		os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed);
270 		proc_unlock(p);
271 		return true;
272 	}
273 
274 	if (wq == WQPTR_IS_INITING_VALUE) {
275 		assert_wait(&p->p_wqptr, THREAD_UNINT);
276 		proc_unlock(p);
277 		thread_block(THREAD_CONTINUE_NULL);
278 	} else {
279 		proc_unlock(p);
280 	}
281 	return false;
282 }
283 
284 static inline event_t
285 workq_parked_wait_event(struct uthread *uth)
286 {
287 	return (event_t)&uth->uu_workq_stackaddr;
288 }
289 
290 static inline void
291 workq_thread_wakeup(struct uthread *uth)
292 {
293 	thread_wakeup_thread(workq_parked_wait_event(uth), get_machthread(uth));
294 }
295 
296 #pragma mark wq_thactive
297 
298 #if defined(__LP64__)
299 // Layout is:
300 //   127 - 115 : 13 bits of zeroes
301 //   114 - 112 : best QoS among all pending constrained requests
302 //   111 -   0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
303 #define WQ_THACTIVE_BUCKET_WIDTH 16
304 #define WQ_THACTIVE_QOS_SHIFT    (7 * WQ_THACTIVE_BUCKET_WIDTH)
305 #else
306 // Layout is:
307 //   63 - 61 : best QoS among all pending constrained requests
308 //   60      : Manager bucket (0 or 1)
309 //   59 -  0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
310 #define WQ_THACTIVE_BUCKET_WIDTH 10
311 #define WQ_THACTIVE_QOS_SHIFT    (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
312 #endif
313 #define WQ_THACTIVE_BUCKET_MASK  ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
314 #define WQ_THACTIVE_BUCKET_HALF  (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
315 
316 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
317     "Make sure we have space to encode a QoS");
318 
319 static inline wq_thactive_t
320 _wq_thactive(struct workqueue *wq)
321 {
322 	return os_atomic_load_wide(&wq->wq_thactive, relaxed);
323 }
324 
325 static inline uint8_t
326 _wq_bucket(thread_qos_t qos)
327 {
328 	// Map both BG and MT to the same bucket by over-shifting down and
329 	// clamping MT and BG together.
330 	switch (qos) {
331 	case THREAD_QOS_MAINTENANCE:
332 		return 0;
333 	default:
334 		return qos - 2;
335 	}
336 }
337 
338 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
339 	        ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT))
340 
341 static inline thread_qos_t
342 _wq_thactive_best_constrained_req_qos(struct workqueue *wq)
343 {
344 	// Avoid expensive atomic operations: the three bits we're loading are in
345 	// a single byte, and always updated under the workqueue lock
346 	wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
347 	return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
348 }
349 
350 static void
351 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq)
352 {
353 	thread_qos_t old_qos, new_qos;
354 	workq_threadreq_t req;
355 
356 	req = priority_queue_max(&wq->wq_constrained_queue,
357 	    struct workq_threadreq_s, tr_entry);
358 	new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED;
359 	old_qos = _wq_thactive_best_constrained_req_qos(wq);
360 	if (old_qos != new_qos) {
361 		long delta = (long)new_qos - (long)old_qos;
362 		wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT;
363 		/*
364 		 * We can do an atomic add relative to the initial load because updates
365 		 * to this qos are always serialized under the workqueue lock.
366 		 */
367 		v = os_atomic_add(&wq->wq_thactive, v, relaxed);
368 #ifdef __LP64__
369 		WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v,
370 		    (uint64_t)(v >> 64), 0);
371 #else
372 		WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0);
373 #endif
374 	}
375 }
376 
377 static inline wq_thactive_t
378 _wq_thactive_offset_for_qos(thread_qos_t qos)
379 {
380 	uint8_t bucket = _wq_bucket(qos);
381 	__builtin_assume(bucket < WORKQ_NUM_BUCKETS);
382 	return (wq_thactive_t)1 << (bucket * WQ_THACTIVE_BUCKET_WIDTH);
383 }
384 
385 static inline wq_thactive_t
386 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos)
387 {
388 	wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
389 	return os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
390 }
391 
392 static inline wq_thactive_t
393 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos)
394 {
395 	wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
396 	return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed);
397 }
398 
399 static inline void
400 _wq_thactive_move(struct workqueue *wq,
401     thread_qos_t old_qos, thread_qos_t new_qos)
402 {
403 	wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) -
404 	    _wq_thactive_offset_for_qos(old_qos);
405 	os_atomic_add(&wq->wq_thactive, v, relaxed);
406 	wq->wq_thscheduled_count[_wq_bucket(old_qos)]--;
407 	wq->wq_thscheduled_count[_wq_bucket(new_qos)]++;
408 }
409 
410 static inline uint32_t
411 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
412     thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount)
413 {
414 	uint32_t count = 0, active;
415 	uint64_t curtime;
416 
417 	assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX);
418 
419 	if (busycount) {
420 		curtime = mach_absolute_time();
421 		*busycount = 0;
422 	}
423 	if (max_busycount) {
424 		*max_busycount = THREAD_QOS_LAST - qos;
425 	}
426 
427 	uint8_t i = _wq_bucket(qos);
428 	v >>= i * WQ_THACTIVE_BUCKET_WIDTH;
429 	for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
430 		active = v & WQ_THACTIVE_BUCKET_MASK;
431 		count += active;
432 
433 		if (busycount && wq->wq_thscheduled_count[i] > active) {
434 			if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
435 				/*
436 				 * We only consider the last blocked thread for a given bucket
437 				 * as busy because we don't want to take the list lock in each
438 				 * sched callback. However this is an approximation that could
439 				 * contribute to thread creation storms.
440 				 */
441 				(*busycount)++;
442 			}
443 		}
444 	}
445 
446 	return count;
447 }
448 
449 /* The input qos here should be the requested QoS of the thread, not accounting
450  * for any overrides */
451 static inline void
452 _wq_cooperative_queue_scheduled_count_dec(struct workqueue *wq, thread_qos_t qos)
453 {
454 	__assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]--;
455 	assert(old_scheduled_count > 0);
456 }
457 
458 /* The input qos here should be the requested QoS of the thread, not accounting
459  * for any overrides */
460 static inline void
461 _wq_cooperative_queue_scheduled_count_inc(struct workqueue *wq, thread_qos_t qos)
462 {
463 	__assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]++;
464 	assert(old_scheduled_count < UINT8_MAX);
465 }
466 
467 #pragma mark wq_flags
468 
469 static inline uint32_t
470 _wq_flags(struct workqueue *wq)
471 {
472 	return os_atomic_load(&wq->wq_flags, relaxed);
473 }
474 
475 static inline bool
476 _wq_exiting(struct workqueue *wq)
477 {
478 	return _wq_flags(wq) & WQ_EXITING;
479 }
480 
481 bool
482 workq_is_exiting(struct proc *p)
483 {
484 	struct workqueue *wq = proc_get_wqptr(p);
485 	return !wq || _wq_exiting(wq);
486 }
487 
488 
489 #pragma mark workqueue lock
490 
491 static bool
492 workq_lock_is_acquired_kdp(struct workqueue *wq)
493 {
494 	return kdp_lck_ticket_is_acquired(&wq->wq_lock);
495 }
496 
497 static inline void
498 workq_lock_spin(struct workqueue *wq)
499 {
500 	lck_ticket_lock(&wq->wq_lock, &workq_lck_grp);
501 }
502 
503 static inline void
504 workq_lock_held(struct workqueue *wq)
505 {
506 	LCK_TICKET_ASSERT_OWNED(&wq->wq_lock);
507 }
508 
509 static inline bool
510 workq_lock_try(struct workqueue *wq)
511 {
512 	return lck_ticket_lock_try(&wq->wq_lock, &workq_lck_grp);
513 }
514 
515 static inline void
516 workq_unlock(struct workqueue *wq)
517 {
518 	lck_ticket_unlock(&wq->wq_lock);
519 }
520 
521 #pragma mark idle thread lists
522 
523 #define WORKQ_POLICY_INIT(qos) \
524 	        (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
525 
526 static inline thread_qos_t
527 workq_pri_bucket(struct uu_workq_policy req)
528 {
529 	return MAX(MAX(req.qos_req, req.qos_max), req.qos_override);
530 }
531 
532 static inline thread_qos_t
533 workq_pri_override(struct uu_workq_policy req)
534 {
535 	return MAX(workq_pri_bucket(req), req.qos_bucket);
536 }
537 
538 static inline bool
539 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth)
540 {
541 	workq_threadreq_param_t cur_trp, req_trp = { };
542 
543 	cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params;
544 	if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
545 		req_trp = kqueue_threadreq_workloop_param(req);
546 	}
547 
548 	/*
549 	 * CPU percent flags are handled separately to policy changes, so ignore
550 	 * them for all of these checks.
551 	 */
552 	uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT);
553 	uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT);
554 
555 	if (!req_flags && !cur_flags) {
556 		return false;
557 	}
558 
559 	if (req_flags != cur_flags) {
560 		return true;
561 	}
562 
563 	if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) {
564 		return true;
565 	}
566 
567 	if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) {
568 		return true;
569 	}
570 
571 	return false;
572 }
573 
574 static inline bool
575 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth)
576 {
577 	if (workq_thread_needs_params_change(req, uth)) {
578 		return true;
579 	}
580 
581 	if (req->tr_qos != workq_pri_override(uth->uu_workq_pri)) {
582 		return true;
583 	}
584 
585 #if CONFIG_PREADOPT_TG
586 	thread_group_qos_t tg = kqr_preadopt_thread_group(req);
587 	if (KQWL_HAS_VALID_PREADOPTED_TG(tg)) {
588 		/*
589 		 * Ideally, we'd add check here to see if thread's preadopt TG is same
590 		 * as the thread requests's thread group and short circuit if that is
591 		 * the case. But in the interest of keeping the code clean and not
592 		 * taking the thread lock here, we're going to skip this. We will
593 		 * eventually shortcircuit once we try to set the preadoption thread
594 		 * group on the thread.
595 		 */
596 		return true;
597 	}
598 #endif
599 
600 	return false;
601 }
602 
603 /* Input thread must be self. Called during self override, resetting overrides
604  * or while processing kevents
605  *
606  * Called with workq lock held. Sometimes also the thread mutex
607  */
608 static void
609 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth,
610     struct uu_workq_policy old_pri, struct uu_workq_policy new_pri,
611     bool force_run)
612 {
613 	assert(uth == current_uthread());
614 
615 	thread_qos_t old_bucket = old_pri.qos_bucket;
616 	thread_qos_t new_bucket = workq_pri_bucket(new_pri);
617 
618 	if (old_bucket != new_bucket) {
619 		_wq_thactive_move(wq, old_bucket, new_bucket);
620 	}
621 
622 	new_pri.qos_bucket = new_bucket;
623 	uth->uu_workq_pri = new_pri;
624 
625 	if (old_pri.qos_override != new_pri.qos_override) {
626 		thread_set_workq_override(get_machthread(uth), new_pri.qos_override);
627 	}
628 
629 	if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) {
630 		int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
631 		if (old_bucket > new_bucket) {
632 			/*
633 			 * When lowering our bucket, we may unblock a thread request,
634 			 * but we can't drop our priority before we have evaluated
635 			 * whether this is the case, and if we ever drop the workqueue lock
636 			 * that would cause a priority inversion.
637 			 *
638 			 * We hence have to disallow thread creation in that case.
639 			 */
640 			flags = 0;
641 		}
642 		workq_schedule_creator(p, wq, flags);
643 	}
644 }
645 
646 /*
647  * Sets/resets the cpu percent limits on the current thread. We can't set
648  * these limits from outside of the current thread, so this function needs
649  * to be called when we're executing on the intended
650  */
651 static void
652 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth)
653 {
654 	assert(uth == current_uthread());
655 	workq_threadreq_param_t trp = { };
656 
657 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
658 		trp = kqueue_threadreq_workloop_param(req);
659 	}
660 
661 	if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) {
662 		/*
663 		 * Going through disable when we have an existing CPU percent limit
664 		 * set will force the ledger to refill the token bucket of the current
665 		 * thread. Removing any penalty applied by previous thread use.
666 		 */
667 		thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0);
668 		uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT;
669 	}
670 
671 	if (trp.trp_flags & TRP_CPUPERCENT) {
672 		thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent,
673 		    (uint64_t)trp.trp_refillms * NSEC_PER_SEC);
674 		uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT;
675 	}
676 }
677 
678 /* Called with the workq lock held */
679 static void
680 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
681     workq_threadreq_t req, bool unpark)
682 {
683 	thread_t th = get_machthread(uth);
684 	thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP;
685 	workq_threadreq_param_t trp = { };
686 	int priority = 31;
687 	int policy = POLICY_TIMESHARE;
688 
689 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
690 		trp = kqueue_threadreq_workloop_param(req);
691 	}
692 
693 	uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
694 	uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS;
695 
696 	if (unpark) {
697 		uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
698 		// qos sent out to userspace (may differ from uu_workq_pri on param threads)
699 		uth->uu_save.uus_workq_park_data.qos = qos;
700 	}
701 
702 	if (qos == WORKQ_THREAD_QOS_MANAGER) {
703 		uint32_t mgr_pri = wq->wq_event_manager_priority;
704 		assert(trp.trp_value == 0); // manager qos and thread policy don't mix
705 
706 		if (_pthread_priority_has_sched_pri(mgr_pri)) {
707 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
708 			thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri,
709 			    POLICY_TIMESHARE);
710 			return;
711 		}
712 
713 		qos = _pthread_priority_thread_qos(mgr_pri);
714 	} else {
715 		if (trp.trp_flags & TRP_PRIORITY) {
716 			qos = THREAD_QOS_UNSPECIFIED;
717 			priority = trp.trp_pri;
718 			uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS;
719 		}
720 
721 		if (trp.trp_flags & TRP_POLICY) {
722 			policy = trp.trp_pol;
723 		}
724 	}
725 
726 #if CONFIG_PREADOPT_TG
727 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP)) {
728 		/*
729 		 * We cannot safely read and borrow the reference from the kqwl since it
730 		 * can disappear from under us at any time due to the max-ing logic in
731 		 * kqueue_set_preadopted_thread_group.
732 		 *
733 		 * As such, we do the following dance:
734 		 *
735 		 * 1) cmpxchng and steal the kqwl's preadopt thread group and leave
736 		 * behind with (NULL + QoS). At this point, we have the reference
737 		 * to the thread group from the kqwl.
738 		 * 2) Have the thread set the preadoption thread group on itself.
739 		 * 3) cmpxchng from (NULL + QoS) which we set earlier in (1), back to
740 		 * thread_group + QoS. ie we try to give the reference back to the kqwl.
741 		 * If we fail, that's because a higher QoS thread group was set on the
742 		 * kqwl in kqueue_set_preadopted_thread_group in which case, we need to
743 		 * go back to (1).
744 		 */
745 
746 		_Atomic(struct thread_group *) * tg_loc = kqr_preadopt_thread_group_addr(req);
747 
748 		thread_group_qos_t old_tg, new_tg;
749 		int ret = 0;
750 again:
751 		ret = os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
752 			if (!KQWL_HAS_VALID_PREADOPTED_TG(old_tg)) {
753 			        os_atomic_rmw_loop_give_up(break);
754 			}
755 
756 			/*
757 			 * Leave the QoS behind - kqueue_set_preadopted_thread_group will
758 			 * only modify it if there is a higher QoS thread group to attach
759 			 */
760 			new_tg = (thread_group_qos_t) ((uintptr_t) old_tg & KQWL_PREADOPT_TG_QOS_MASK);
761 		});
762 
763 		if (ret) {
764 			/*
765 			 * We successfully took the ref from the kqwl so set it on the
766 			 * thread now
767 			 */
768 			thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg));
769 
770 			thread_group_qos_t thread_group_to_expect = new_tg;
771 			thread_group_qos_t thread_group_to_set = old_tg;
772 
773 			os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
774 				if (old_tg != thread_group_to_expect) {
775 				        /*
776 				         * There was an intervening write to the kqwl_preadopt_tg,
777 				         * and it has a higher QoS than what we are working with
778 				         * here. Abandon our current adopted thread group and redo
779 				         * the full dance
780 				         */
781 				        thread_group_deallocate_safe(KQWL_GET_PREADOPTED_TG(thread_group_to_set));
782 				        os_atomic_rmw_loop_give_up(goto again);
783 				}
784 
785 				new_tg = thread_group_to_set;
786 			});
787 		} else {
788 			/* Nothing valid on the kqwl, just clear what's on the thread */
789 			thread_set_preadopt_thread_group(th, NULL);
790 		}
791 	} else {
792 		/* Not even a kqwl, clear what's on the thread */
793 		thread_set_preadopt_thread_group(th, NULL);
794 	}
795 #endif
796 	thread_set_workq_pri(th, qos, priority, policy);
797 }
798 
799 /*
800  * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
801  * every time a servicer is being told about a new max QoS.
802  */
803 void
804 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr)
805 {
806 	struct uu_workq_policy old_pri, new_pri;
807 	struct uthread *uth = current_uthread();
808 	struct workqueue *wq = proc_get_wqptr_fast(p);
809 	thread_qos_t qos = kqr->tr_kq_qos_index;
810 
811 	if (uth->uu_workq_pri.qos_max == qos) {
812 		return;
813 	}
814 
815 	workq_lock_spin(wq);
816 	old_pri = new_pri = uth->uu_workq_pri;
817 	new_pri.qos_max = qos;
818 	workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
819 	workq_unlock(wq);
820 }
821 
822 #pragma mark idle threads accounting and handling
823 
824 static inline struct uthread *
825 workq_oldest_killable_idle_thread(struct workqueue *wq)
826 {
827 	struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
828 
829 	if (uth && !uth->uu_save.uus_workq_park_data.has_stack) {
830 		uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry);
831 		if (uth) {
832 			assert(uth->uu_save.uus_workq_park_data.has_stack);
833 		}
834 	}
835 	return uth;
836 }
837 
838 static inline uint64_t
839 workq_kill_delay_for_idle_thread(struct workqueue *wq)
840 {
841 	uint64_t delay = wq_reduce_pool_window.abstime;
842 	uint16_t idle = wq->wq_thidlecount;
843 
844 	/*
845 	 * If we have less than wq_death_max_load threads, have a 5s timer.
846 	 *
847 	 * For the next wq_max_constrained_threads ones, decay linearly from
848 	 * from 5s to 50ms.
849 	 */
850 	if (idle <= wq_death_max_load) {
851 		return delay;
852 	}
853 
854 	if (wq_max_constrained_threads > idle - wq_death_max_load) {
855 		delay *= (wq_max_constrained_threads - (idle - wq_death_max_load));
856 	}
857 	return delay / wq_max_constrained_threads;
858 }
859 
860 static inline bool
861 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth,
862     uint64_t now)
863 {
864 	uint64_t delay = workq_kill_delay_for_idle_thread(wq);
865 	return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
866 }
867 
868 static void
869 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline)
870 {
871 	uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed);
872 
873 	if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
874 		return;
875 	}
876 	os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
877 
878 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0);
879 
880 	/*
881 	 * <rdar://problem/13139182> Due to how long term timers work, the leeway
882 	 * can't be too short, so use 500ms which is long enough that we will not
883 	 * wake up the CPU for killing threads, but short enough that it doesn't
884 	 * fall into long-term timer list shenanigans.
885 	 */
886 	thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline,
887 	    wq_reduce_pool_window.abstime / 10,
888 	    THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
889 }
890 
891 /*
892  * `decrement` is set to the number of threads that are no longer dying:
893  * - because they have been resuscitated just in time (workq_pop_idle_thread)
894  * - or have been killed (workq_thread_terminate).
895  */
896 static void
897 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
898 {
899 	struct uthread *uth;
900 
901 	assert(wq->wq_thdying_count >= decrement);
902 	if ((wq->wq_thdying_count -= decrement) > 0) {
903 		return;
904 	}
905 
906 	if (wq->wq_thidlecount <= 1) {
907 		return;
908 	}
909 
910 	if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) {
911 		return;
912 	}
913 
914 	uint64_t now = mach_absolute_time();
915 	uint64_t delay = workq_kill_delay_for_idle_thread(wq);
916 
917 	if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
918 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
919 		    wq, wq->wq_thidlecount, 0, 0);
920 		wq->wq_thdying_count++;
921 		uth->uu_workq_flags |= UT_WORKQ_DYING;
922 		if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
923 			workq_thread_wakeup(uth);
924 		}
925 		return;
926 	}
927 
928 	workq_death_call_schedule(wq,
929 	    uth->uu_save.uus_workq_park_data.idle_stamp + delay);
930 }
931 
932 void
933 workq_thread_terminate(struct proc *p, struct uthread *uth)
934 {
935 	struct workqueue *wq = proc_get_wqptr_fast(p);
936 
937 	workq_lock_spin(wq);
938 	TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
939 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
940 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END,
941 		    wq, wq->wq_thidlecount, 0, 0);
942 		workq_death_policy_evaluate(wq, 1);
943 	}
944 	if (wq->wq_nthreads-- == wq_max_threads) {
945 		/*
946 		 * We got under the thread limit again, which may have prevented
947 		 * thread creation from happening, redrive if there are pending requests
948 		 */
949 		if (wq->wq_reqcount) {
950 			workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
951 		}
952 	}
953 	workq_unlock(wq);
954 
955 	thread_deallocate(get_machthread(uth));
956 }
957 
958 static void
959 workq_kill_old_threads_call(void *param0, void *param1 __unused)
960 {
961 	struct workqueue *wq = param0;
962 
963 	workq_lock_spin(wq);
964 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0);
965 	os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
966 	workq_death_policy_evaluate(wq, 0);
967 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0);
968 	workq_unlock(wq);
969 }
970 
971 static struct uthread *
972 workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags,
973     bool *needs_wakeup)
974 {
975 	struct uthread *uth;
976 
977 	if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
978 		TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
979 	} else {
980 		uth = TAILQ_FIRST(&wq->wq_thnewlist);
981 		TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
982 	}
983 	TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
984 
985 	assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
986 	uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags;
987 
988 	/* A thread is never woken up as part of the cooperative pool */
989 	assert((uu_flags & UT_WORKQ_COOPERATIVE) == 0);
990 
991 	if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) {
992 		wq->wq_constrained_threads_scheduled++;
993 	}
994 	wq->wq_threads_scheduled++;
995 	wq->wq_thidlecount--;
996 
997 	if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
998 		uth->uu_workq_flags ^= UT_WORKQ_DYING;
999 		workq_death_policy_evaluate(wq, 1);
1000 		*needs_wakeup = false;
1001 	} else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
1002 		*needs_wakeup = false;
1003 	} else {
1004 		*needs_wakeup = true;
1005 	}
1006 	return uth;
1007 }
1008 
1009 /*
1010  * Called by thread_create_workq_waiting() during thread initialization, before
1011  * assert_wait, before the thread has been started.
1012  */
1013 event_t
1014 workq_thread_init_and_wq_lock(task_t task, thread_t th)
1015 {
1016 	struct uthread *uth = get_bsdthread_info(th);
1017 
1018 	uth->uu_workq_flags = UT_WORKQ_NEW;
1019 	uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
1020 	uth->uu_workq_thport = MACH_PORT_NULL;
1021 	uth->uu_workq_stackaddr = 0;
1022 	uth->uu_workq_pthread_kill_allowed = 0;
1023 
1024 	thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
1025 	thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
1026 
1027 	workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task)));
1028 	return workq_parked_wait_event(uth);
1029 }
1030 
1031 /**
1032  * Try to add a new workqueue thread.
1033  *
1034  * - called with workq lock held
1035  * - dropped and retaken around thread creation
1036  * - return with workq lock held
1037  */
1038 static bool
1039 workq_add_new_idle_thread(proc_t p, struct workqueue *wq)
1040 {
1041 	mach_vm_offset_t th_stackaddr;
1042 	kern_return_t kret;
1043 	thread_t th;
1044 
1045 	wq->wq_nthreads++;
1046 
1047 	workq_unlock(wq);
1048 
1049 	vm_map_t vmap = get_task_map(proc_task(p));
1050 
1051 	kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
1052 	if (kret != KERN_SUCCESS) {
1053 		WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1054 		    kret, 1, 0);
1055 		goto out;
1056 	}
1057 
1058 	kret = thread_create_workq_waiting(proc_task(p), workq_unpark_continue, &th);
1059 	if (kret != KERN_SUCCESS) {
1060 		WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1061 		    kret, 0, 0);
1062 		pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
1063 		goto out;
1064 	}
1065 
1066 	// thread_create_workq_waiting() will return with the wq lock held
1067 	// on success, because it calls workq_thread_init_and_wq_lock() above
1068 
1069 	struct uthread *uth = get_bsdthread_info(th);
1070 
1071 	wq->wq_creations++;
1072 	wq->wq_thidlecount++;
1073 	uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr;
1074 	TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1075 
1076 	WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0);
1077 	return true;
1078 
1079 out:
1080 	workq_lock_spin(wq);
1081 	/*
1082 	 * Do not redrive here if we went under wq_max_threads again,
1083 	 * it is the responsibility of the callers of this function
1084 	 * to do so when it fails.
1085 	 */
1086 	wq->wq_nthreads--;
1087 	return false;
1088 }
1089 
1090 static inline bool
1091 workq_thread_is_overcommit(struct uthread *uth)
1092 {
1093 	return (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0;
1094 }
1095 
1096 static inline bool
1097 workq_thread_is_nonovercommit(struct uthread *uth)
1098 {
1099 	return (uth->uu_workq_flags & (UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE)) == 0;
1100 }
1101 
1102 static inline bool
1103 workq_thread_is_cooperative(struct uthread *uth)
1104 {
1105 	return (uth->uu_workq_flags & UT_WORKQ_COOPERATIVE) != 0;
1106 }
1107 
1108 static inline void
1109 workq_thread_set_type(struct uthread *uth, uint16_t flags)
1110 {
1111 	uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1112 	uth->uu_workq_flags |= flags;
1113 }
1114 
1115 
1116 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
1117 
1118 __attribute__((noreturn, noinline))
1119 static void
1120 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq,
1121     struct uthread *uth, uint32_t death_flags, uint32_t setup_flags)
1122 {
1123 	thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
1124 	bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW;
1125 
1126 	if (qos > WORKQ_THREAD_QOS_CLEANUP) {
1127 		workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true);
1128 		qos = WORKQ_THREAD_QOS_CLEANUP;
1129 	}
1130 
1131 	workq_thread_reset_cpupercent(NULL, uth);
1132 
1133 	if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
1134 		wq->wq_thidlecount--;
1135 		if (first_use) {
1136 			TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
1137 		} else {
1138 			TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
1139 		}
1140 	}
1141 	TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
1142 
1143 	workq_unlock(wq);
1144 
1145 	if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
1146 		__assert_only kern_return_t kr;
1147 		kr = thread_set_voucher_name(MACH_PORT_NULL);
1148 		assert(kr == KERN_SUCCESS);
1149 	}
1150 
1151 	uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS;
1152 	thread_t th = get_machthread(uth);
1153 	vm_map_t vmap = get_task_map(proc_task(p));
1154 
1155 	if (!first_use) {
1156 		flags |= WQ_FLAG_THREAD_REUSE;
1157 	}
1158 
1159 	pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
1160 	    uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags);
1161 	__builtin_unreachable();
1162 }
1163 
1164 bool
1165 workq_is_current_thread_updating_turnstile(struct workqueue *wq)
1166 {
1167 	return wq->wq_turnstile_updater == current_thread();
1168 }
1169 
1170 __attribute__((always_inline))
1171 static inline void
1172 workq_perform_turnstile_operation_locked(struct workqueue *wq,
1173     void (^operation)(void))
1174 {
1175 	workq_lock_held(wq);
1176 	wq->wq_turnstile_updater = current_thread();
1177 	operation();
1178 	wq->wq_turnstile_updater = THREAD_NULL;
1179 }
1180 
1181 static void
1182 workq_turnstile_update_inheritor(struct workqueue *wq,
1183     turnstile_inheritor_t inheritor,
1184     turnstile_update_flags_t flags)
1185 {
1186 	if (wq->wq_inheritor == inheritor) {
1187 		return;
1188 	}
1189 	wq->wq_inheritor = inheritor;
1190 	workq_perform_turnstile_operation_locked(wq, ^{
1191 		turnstile_update_inheritor(wq->wq_turnstile, inheritor,
1192 		flags | TURNSTILE_IMMEDIATE_UPDATE);
1193 		turnstile_update_inheritor_complete(wq->wq_turnstile,
1194 		TURNSTILE_INTERLOCK_HELD);
1195 	});
1196 }
1197 
1198 static void
1199 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth,
1200     uint32_t setup_flags)
1201 {
1202 	uint64_t now = mach_absolute_time();
1203 	bool is_creator = (uth == wq->wq_creator);
1204 
1205 	if (workq_thread_is_cooperative(uth)) {
1206 		assert(!is_creator);
1207 
1208 		thread_qos_t thread_qos = uth->uu_workq_pri.qos_req;
1209 		_wq_cooperative_queue_scheduled_count_dec(wq, thread_qos);
1210 
1211 		/* Before we get here, we always go through
1212 		 * workq_select_threadreq_or_park_and_unlock. If we got here, it means
1213 		 * that we went through the logic in workq_threadreq_select which
1214 		 * did the refresh for the next best cooperative qos while
1215 		 * excluding the current thread - we shouldn't need to do it again.
1216 		 */
1217 		assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
1218 	} else if (workq_thread_is_nonovercommit(uth)) {
1219 		assert(!is_creator);
1220 
1221 		wq->wq_constrained_threads_scheduled--;
1222 	}
1223 
1224 	uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1225 	TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
1226 	wq->wq_threads_scheduled--;
1227 
1228 	if (is_creator) {
1229 		wq->wq_creator = NULL;
1230 		WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0,
1231 		    uth->uu_save.uus_workq_park_data.yields);
1232 	}
1233 
1234 	if (wq->wq_inheritor == get_machthread(uth)) {
1235 		assert(wq->wq_creator == NULL);
1236 		if (wq->wq_reqcount) {
1237 			workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
1238 		} else {
1239 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
1240 		}
1241 	}
1242 
1243 	if (uth->uu_workq_flags & UT_WORKQ_NEW) {
1244 		assert(is_creator || (_wq_flags(wq) & WQ_EXITING));
1245 		TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1246 		wq->wq_thidlecount++;
1247 		return;
1248 	}
1249 
1250 	if (!is_creator) {
1251 		_wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket);
1252 		wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--;
1253 		uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP;
1254 	}
1255 
1256 	uth->uu_save.uus_workq_park_data.idle_stamp = now;
1257 
1258 	struct uthread *oldest = workq_oldest_killable_idle_thread(wq);
1259 	uint16_t cur_idle = wq->wq_thidlecount;
1260 
1261 	if (cur_idle >= wq_max_constrained_threads ||
1262 	    (wq->wq_thdying_count == 0 && oldest &&
1263 	    workq_should_kill_idle_thread(wq, oldest, now))) {
1264 		/*
1265 		 * Immediately kill threads if we have too may of them.
1266 		 *
1267 		 * And swap "place" with the oldest one we'd have woken up.
1268 		 * This is a relatively desperate situation where we really
1269 		 * need to kill threads quickly and it's best to kill
1270 		 * the one that's currently on core than context switching.
1271 		 */
1272 		if (oldest) {
1273 			oldest->uu_save.uus_workq_park_data.idle_stamp = now;
1274 			TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry);
1275 			TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry);
1276 		}
1277 
1278 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
1279 		    wq, cur_idle, 0, 0);
1280 		wq->wq_thdying_count++;
1281 		uth->uu_workq_flags |= UT_WORKQ_DYING;
1282 		uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
1283 		workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags);
1284 		__builtin_unreachable();
1285 	}
1286 
1287 	struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
1288 
1289 	cur_idle += 1;
1290 	wq->wq_thidlecount = cur_idle;
1291 
1292 	if (cur_idle >= wq_death_max_load && tail &&
1293 	    tail->uu_save.uus_workq_park_data.has_stack) {
1294 		uth->uu_save.uus_workq_park_data.has_stack = false;
1295 		TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry);
1296 	} else {
1297 		uth->uu_save.uus_workq_park_data.has_stack = true;
1298 		TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
1299 	}
1300 
1301 	if (!tail) {
1302 		uint64_t delay = workq_kill_delay_for_idle_thread(wq);
1303 		workq_death_call_schedule(wq, now + delay);
1304 	}
1305 }
1306 
1307 #pragma mark thread requests
1308 
1309 static inline bool
1310 workq_tr_is_overcommit(workq_tr_flags_t tr_flags)
1311 {
1312 	return (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) != 0;
1313 }
1314 
1315 static inline bool
1316 workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)
1317 {
1318 	return (tr_flags & (WORKQ_TR_FLAG_OVERCOMMIT | WORKQ_TR_FLAG_COOPERATIVE)) == 0;
1319 }
1320 
1321 static inline bool
1322 workq_tr_is_cooperative(workq_tr_flags_t tr_flags)
1323 {
1324 	return (tr_flags & WORKQ_TR_FLAG_COOPERATIVE) != 0;
1325 }
1326 
1327 #define workq_threadreq_is_overcommit(req) workq_tr_is_overcommit((req)->tr_flags)
1328 #define workq_threadreq_is_nonovercommit(req) workq_tr_is_nonovercommit((req)->tr_flags)
1329 #define workq_threadreq_is_cooperative(req) workq_tr_is_cooperative((req)->tr_flags)
1330 
1331 static inline int
1332 workq_priority_for_req(workq_threadreq_t req)
1333 {
1334 	thread_qos_t qos = req->tr_qos;
1335 
1336 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1337 		workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
1338 		assert(trp.trp_flags & TRP_PRIORITY);
1339 		return trp.trp_pri;
1340 	}
1341 	return thread_workq_pri_for_qos(qos);
1342 }
1343 
1344 static inline struct priority_queue_sched_max *
1345 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req)
1346 {
1347 	assert(!workq_tr_is_cooperative(req->tr_flags));
1348 
1349 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1350 		return &wq->wq_special_queue;
1351 	} else if (workq_tr_is_overcommit(req->tr_flags)) {
1352 		return &wq->wq_overcommit_queue;
1353 	} else {
1354 		return &wq->wq_constrained_queue;
1355 	}
1356 }
1357 
1358 
1359 /* Calculates the number of threads scheduled >= the input QoS */
1360 static uint64_t
1361 workq_num_cooperative_threads_scheduled_to_qos(struct workqueue *wq, thread_qos_t qos)
1362 {
1363 	workq_lock_held(wq);
1364 
1365 	uint64_t num_cooperative_threads = 0;
1366 
1367 	for (thread_qos_t cur_qos = WORKQ_THREAD_QOS_MAX; cur_qos >= qos; cur_qos--) {
1368 		uint8_t bucket = _wq_bucket(cur_qos);
1369 		num_cooperative_threads += wq->wq_cooperative_queue_scheduled_count[bucket];
1370 	}
1371 
1372 	return num_cooperative_threads;
1373 }
1374 
1375 static uint64_t
1376 workq_num_cooperative_threads_scheduled_total(struct workqueue *wq)
1377 {
1378 	return workq_num_cooperative_threads_scheduled_to_qos(wq, WORKQ_THREAD_QOS_MIN);
1379 }
1380 
1381 #if DEBUG || DEVELOPMENT
1382 static bool
1383 workq_has_cooperative_thread_requests(struct workqueue *wq)
1384 {
1385 	for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1386 		uint8_t bucket = _wq_bucket(qos);
1387 		if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1388 			return true;
1389 		}
1390 	}
1391 
1392 	return false;
1393 }
1394 #endif
1395 
1396 /*
1397  * Determines the next QoS bucket we should service next in the cooperative
1398  * pool. This function will always return a QoS for cooperative pool as long as
1399  * there are requests to be serviced.
1400  *
1401  * Unlike the other thread pools, for the cooperative thread pool the schedule
1402  * counts for the various buckets in the pool affect the next best request for
1403  * it.
1404  *
1405  * This function is called in the following contexts:
1406  *
1407  * a) When determining the best thread QoS for cooperative bucket for the
1408  * creator/thread reuse
1409  *
1410  * b) Once (a) has happened and thread has bound to a thread request, figuring
1411  * out whether the next best request for this pool has changed so that creator
1412  * can be scheduled.
1413  *
1414  * Returns true if the cooperative queue's best qos changed from previous
1415  * value.
1416  */
1417 static bool
1418 _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq)
1419 {
1420 	workq_lock_held(wq);
1421 
1422 	thread_qos_t old_best_req_qos = wq->wq_cooperative_queue_best_req_qos;
1423 
1424 	/* We determine the next best cooperative thread request based on the
1425 	 * following:
1426 	 *
1427 	 * 1. Take the MAX of the following:
1428 	 *		a) Highest qos with pending TRs such that number of scheduled
1429 	 *		threads so far with >= qos is < wq_max_cooperative_threads
1430 	 *		b) Highest qos bucket with pending TRs but no scheduled threads for that bucket
1431 	 *
1432 	 * 2. If the result of (1) is UN, then we pick the highest priority amongst
1433 	 * pending thread requests in the pool.
1434 	 *
1435 	 */
1436 	thread_qos_t highest_qos_with_no_scheduled = THREAD_QOS_UNSPECIFIED;
1437 	thread_qos_t highest_qos_req_with_width = THREAD_QOS_UNSPECIFIED;
1438 
1439 	thread_qos_t highest_qos_req = THREAD_QOS_UNSPECIFIED;
1440 
1441 	int scheduled_count_till_qos = 0;
1442 
1443 	for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1444 		uint8_t bucket = _wq_bucket(qos);
1445 		uint8_t scheduled_count_for_bucket = wq->wq_cooperative_queue_scheduled_count[bucket];
1446 		scheduled_count_till_qos += scheduled_count_for_bucket;
1447 
1448 		if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1449 			if (qos > highest_qos_req) {
1450 				highest_qos_req = qos;
1451 			}
1452 			/*
1453 			 * The pool isn't saturated for threads at and above this QoS, and
1454 			 * this qos bucket has pending requests
1455 			 */
1456 			if (scheduled_count_till_qos < wq_cooperative_queue_max_size(wq)) {
1457 				if (qos > highest_qos_req_with_width) {
1458 					highest_qos_req_with_width = qos;
1459 				}
1460 			}
1461 
1462 			/*
1463 			 * There are no threads scheduled for this bucket but there
1464 			 * is work pending, give it at least 1 thread
1465 			 */
1466 			if (scheduled_count_for_bucket == 0) {
1467 				if (qos > highest_qos_with_no_scheduled) {
1468 					highest_qos_with_no_scheduled = qos;
1469 				}
1470 			}
1471 		}
1472 	}
1473 
1474 	wq->wq_cooperative_queue_best_req_qos = MAX(highest_qos_with_no_scheduled, highest_qos_req_with_width);
1475 	if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1476 		wq->wq_cooperative_queue_best_req_qos = highest_qos_req;
1477 	}
1478 
1479 #if DEBUG || DEVELOPMENT
1480 	/* Assert that if we are showing up the next best req as UN, then there
1481 	 * actually is no thread request in the cooperative pool buckets */
1482 	if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1483 		assert(!workq_has_cooperative_thread_requests(wq));
1484 	}
1485 #endif
1486 
1487 	return old_best_req_qos != wq->wq_cooperative_queue_best_req_qos;
1488 }
1489 
1490 /*
1491  * Returns whether or not the input thread (or creator thread if uth is NULL)
1492  * should be allowed to work as part of the cooperative pool for the <input qos>
1493  * bucket.
1494  *
1495  * This function is called in a bunch of places:
1496  *		a) Quantum expires for a thread and it is part of the cooperative pool
1497  *		b) When trying to pick a thread request for the creator thread to
1498  *		represent.
1499  *		c) When a thread is trying to pick a thread request to actually bind to
1500  *		and service.
1501  *
1502  * Called with workq lock held.
1503  */
1504 
1505 #define WQ_COOPERATIVE_POOL_UNSATURATED 1
1506 #define WQ_COOPERATIVE_BUCKET_UNSERVICED 2
1507 #define WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS 3
1508 
1509 static bool
1510 workq_cooperative_allowance(struct workqueue *wq, thread_qos_t qos, struct uthread *uth,
1511     bool may_start_timer)
1512 {
1513 	workq_lock_held(wq);
1514 
1515 	bool exclude_thread_as_scheduled = false;
1516 	bool passed_admissions = false;
1517 	uint8_t bucket = _wq_bucket(qos);
1518 
1519 	if (uth && workq_thread_is_cooperative(uth)) {
1520 		exclude_thread_as_scheduled = true;
1521 		_wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req);
1522 	}
1523 
1524 	/*
1525 	 * We have not saturated the pool yet, let this thread continue
1526 	 */
1527 	uint64_t total_cooperative_threads;
1528 	total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq);
1529 	if (total_cooperative_threads < wq_cooperative_queue_max_size(wq)) {
1530 		passed_admissions = true;
1531 		WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1532 		    total_cooperative_threads, qos, passed_admissions,
1533 		    WQ_COOPERATIVE_POOL_UNSATURATED);
1534 		goto out;
1535 	}
1536 
1537 	/*
1538 	 * Without this thread, nothing is servicing the bucket which has pending
1539 	 * work
1540 	 */
1541 	uint64_t bucket_scheduled = wq->wq_cooperative_queue_scheduled_count[bucket];
1542 	if (bucket_scheduled == 0 &&
1543 	    !STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1544 		passed_admissions = true;
1545 		WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1546 		    total_cooperative_threads, qos, passed_admissions,
1547 		    WQ_COOPERATIVE_BUCKET_UNSERVICED);
1548 		goto out;
1549 	}
1550 
1551 	/*
1552 	 * If number of threads at the QoS bucket >= input QoS exceeds the max we want
1553 	 * for the pool, deny this thread
1554 	 */
1555 	uint64_t aggregate_down_to_qos = workq_num_cooperative_threads_scheduled_to_qos(wq, qos);
1556 	passed_admissions = (aggregate_down_to_qos < wq_cooperative_queue_max_size(wq));
1557 	WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, aggregate_down_to_qos,
1558 	    qos, passed_admissions, WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS);
1559 
1560 	if (!passed_admissions && may_start_timer) {
1561 		workq_schedule_delayed_thread_creation(wq, 0);
1562 	}
1563 
1564 out:
1565 	if (exclude_thread_as_scheduled) {
1566 		_wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req);
1567 	}
1568 	return passed_admissions;
1569 }
1570 
1571 /*
1572  * returns true if the best request for the pool changed as a result of
1573  * enqueuing this thread request.
1574  */
1575 static bool
1576 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req)
1577 {
1578 	assert(req->tr_state == WORKQ_TR_STATE_NEW);
1579 
1580 	req->tr_state = WORKQ_TR_STATE_QUEUED;
1581 	wq->wq_reqcount += req->tr_count;
1582 
1583 	if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1584 		assert(wq->wq_event_manager_threadreq == NULL);
1585 		assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT);
1586 		assert(req->tr_count == 1);
1587 		wq->wq_event_manager_threadreq = req;
1588 		return true;
1589 	}
1590 
1591 	if (workq_threadreq_is_cooperative(req)) {
1592 		assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1593 		assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1594 
1595 		struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1596 		STAILQ_INSERT_TAIL(bucket, req, tr_link);
1597 
1598 		return _wq_cooperative_queue_refresh_best_req_qos(wq);
1599 	}
1600 
1601 	struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req);
1602 
1603 	priority_queue_entry_set_sched_pri(q, &req->tr_entry,
1604 	    workq_priority_for_req(req), false);
1605 
1606 	if (priority_queue_insert(q, &req->tr_entry)) {
1607 		if (workq_threadreq_is_nonovercommit(req)) {
1608 			_wq_thactive_refresh_best_constrained_req_qos(wq);
1609 		}
1610 		return true;
1611 	}
1612 	return false;
1613 }
1614 
1615 /*
1616  * returns true if one of the following is true (so as to update creator if
1617  * needed):
1618  *
1619  * (a) the next highest request of the pool we dequeued the request from changed
1620  * (b) the next highest requests of the pool the current thread used to be a
1621  * part of, changed
1622  *
1623  * For overcommit, special and constrained pools, the next highest QoS for each
1624  * pool just a MAX of pending requests so tracking (a) is sufficient.
1625  *
1626  * But for cooperative thread pool, the next highest QoS for the pool depends on
1627  * schedule counts in the pool as well. So if the current thread used to be
1628  * cooperative in it's previous logical run ie (b), then that can also affect
1629  * cooperative pool's next best QoS requests.
1630  */
1631 static bool
1632 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req,
1633     bool cooperative_sched_count_changed)
1634 {
1635 	wq->wq_reqcount--;
1636 
1637 	bool next_highest_request_changed = false;
1638 
1639 	if (--req->tr_count == 0) {
1640 		if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1641 			assert(wq->wq_event_manager_threadreq == req);
1642 			assert(req->tr_count == 0);
1643 			wq->wq_event_manager_threadreq = NULL;
1644 
1645 			/* If a cooperative thread was the one which picked up the manager
1646 			 * thread request, we need to reevaluate the cooperative pool
1647 			 * anyways.
1648 			 */
1649 			if (cooperative_sched_count_changed) {
1650 				_wq_cooperative_queue_refresh_best_req_qos(wq);
1651 			}
1652 			return true;
1653 		}
1654 
1655 		if (workq_threadreq_is_cooperative(req)) {
1656 			assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1657 			assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1658 			/* Account for the fact that BG and MT are coalesced when
1659 			 * calculating best request for cooperative pool
1660 			 */
1661 			assert(_wq_bucket(req->tr_qos) == _wq_bucket(wq->wq_cooperative_queue_best_req_qos));
1662 
1663 			struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1664 			__assert_only workq_threadreq_t head = STAILQ_FIRST(bucket);
1665 
1666 			assert(head == req);
1667 			STAILQ_REMOVE_HEAD(bucket, tr_link);
1668 
1669 			/*
1670 			 * If the request we're dequeueing is cooperative, then the sched
1671 			 * counts definitely changed.
1672 			 */
1673 			assert(cooperative_sched_count_changed);
1674 		}
1675 
1676 		/*
1677 		 * We want to do the cooperative pool refresh after dequeueing a
1678 		 * cooperative thread request if any (to combine both effects into 1
1679 		 * refresh operation)
1680 		 */
1681 		if (cooperative_sched_count_changed) {
1682 			next_highest_request_changed = _wq_cooperative_queue_refresh_best_req_qos(wq);
1683 		}
1684 
1685 		if (!workq_threadreq_is_cooperative(req)) {
1686 			/*
1687 			 * All other types of requests are enqueued in priority queues
1688 			 */
1689 
1690 			if (priority_queue_remove(workq_priority_queue_for_req(wq, req),
1691 			    &req->tr_entry)) {
1692 				next_highest_request_changed |= true;
1693 				if (workq_threadreq_is_nonovercommit(req)) {
1694 					_wq_thactive_refresh_best_constrained_req_qos(wq);
1695 				}
1696 			}
1697 		}
1698 	}
1699 
1700 	return next_highest_request_changed;
1701 }
1702 
1703 static void
1704 workq_threadreq_destroy(proc_t p, workq_threadreq_t req)
1705 {
1706 	req->tr_state = WORKQ_TR_STATE_CANCELED;
1707 	if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) {
1708 		kqueue_threadreq_cancel(p, req);
1709 	} else {
1710 		zfree(workq_zone_threadreq, req);
1711 	}
1712 }
1713 
1714 #pragma mark workqueue thread creation thread calls
1715 
1716 static inline bool
1717 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend,
1718     uint32_t fail_mask)
1719 {
1720 	uint32_t old_flags, new_flags;
1721 
1722 	os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, {
1723 		if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) {
1724 		        os_atomic_rmw_loop_give_up(return false);
1725 		}
1726 		if (__improbable(old_flags & WQ_PROC_SUSPENDED)) {
1727 		        new_flags = old_flags | pend;
1728 		} else {
1729 		        new_flags = old_flags | sched;
1730 		}
1731 	});
1732 
1733 	return (old_flags & WQ_PROC_SUSPENDED) == 0;
1734 }
1735 
1736 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1737 
1738 static bool
1739 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags)
1740 {
1741 	assert(!preemption_enabled());
1742 
1743 	if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED,
1744 	    WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED |
1745 	    WQ_IMMEDIATE_CALL_SCHEDULED)) {
1746 		return false;
1747 	}
1748 
1749 	uint64_t now = mach_absolute_time();
1750 
1751 	if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) {
1752 		/* do not change the window */
1753 	} else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) {
1754 		wq->wq_timer_interval *= 2;
1755 		if (wq->wq_timer_interval > wq_max_timer_interval.abstime) {
1756 			wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime;
1757 		}
1758 	} else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) {
1759 		wq->wq_timer_interval /= 2;
1760 		if (wq->wq_timer_interval < wq_stalled_window.abstime) {
1761 			wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
1762 		}
1763 	}
1764 
1765 	WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1766 	    _wq_flags(wq), wq->wq_timer_interval);
1767 
1768 	thread_call_t call = wq->wq_delayed_call;
1769 	uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED;
1770 	uint64_t deadline = now + wq->wq_timer_interval;
1771 	if (thread_call_enter1_delayed(call, (void *)arg, deadline)) {
1772 		panic("delayed_call was already enqueued");
1773 	}
1774 	return true;
1775 }
1776 
1777 static void
1778 workq_schedule_immediate_thread_creation(struct workqueue *wq)
1779 {
1780 	assert(!preemption_enabled());
1781 
1782 	if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED,
1783 	    WQ_IMMEDIATE_CALL_PENDED, 0)) {
1784 		WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1785 		    _wq_flags(wq), 0);
1786 
1787 		uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED;
1788 		if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) {
1789 			panic("immediate_call was already enqueued");
1790 		}
1791 	}
1792 }
1793 
1794 void
1795 workq_proc_suspended(struct proc *p)
1796 {
1797 	struct workqueue *wq = proc_get_wqptr(p);
1798 
1799 	if (wq) {
1800 		os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed);
1801 	}
1802 }
1803 
1804 void
1805 workq_proc_resumed(struct proc *p)
1806 {
1807 	struct workqueue *wq = proc_get_wqptr(p);
1808 	uint32_t wq_flags;
1809 
1810 	if (!wq) {
1811 		return;
1812 	}
1813 
1814 	wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED |
1815 	    WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed);
1816 	if ((wq_flags & WQ_EXITING) == 0) {
1817 		disable_preemption();
1818 		if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) {
1819 			workq_schedule_immediate_thread_creation(wq);
1820 		} else if (wq_flags & WQ_DELAYED_CALL_PENDED) {
1821 			workq_schedule_delayed_thread_creation(wq,
1822 			    WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART);
1823 		}
1824 		enable_preemption();
1825 	}
1826 }
1827 
1828 /**
1829  * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1830  */
1831 static bool
1832 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp)
1833 {
1834 	uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed);
1835 	if (now <= lastblocked_ts) {
1836 		/*
1837 		 * Because the update of the timestamp when a thread blocks
1838 		 * isn't serialized against us looking at it (i.e. we don't hold
1839 		 * the workq lock), it's possible to have a timestamp that matches
1840 		 * the current time or that even looks to be in the future relative
1841 		 * to when we grabbed the current time...
1842 		 *
1843 		 * Just treat this as a busy thread since it must have just blocked.
1844 		 */
1845 		return true;
1846 	}
1847 	return (now - lastblocked_ts) < wq_stalled_window.abstime;
1848 }
1849 
1850 static void
1851 workq_add_new_threads_call(void *_p, void *flags)
1852 {
1853 	proc_t p = _p;
1854 	struct workqueue *wq = proc_get_wqptr(p);
1855 	uint32_t my_flag = (uint32_t)(uintptr_t)flags;
1856 
1857 	/*
1858 	 * workq_exit() will set the workqueue to NULL before
1859 	 * it cancels thread calls.
1860 	 */
1861 	if (!wq) {
1862 		return;
1863 	}
1864 
1865 	assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) ||
1866 	    (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED));
1867 
1868 	WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq),
1869 	    wq->wq_nthreads, wq->wq_thidlecount);
1870 
1871 	workq_lock_spin(wq);
1872 
1873 	wq->wq_thread_call_last_run = mach_absolute_time();
1874 	os_atomic_andnot(&wq->wq_flags, my_flag, release);
1875 
1876 	/* This can drop the workqueue lock, and take it again */
1877 	workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
1878 
1879 	workq_unlock(wq);
1880 
1881 	WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0,
1882 	    wq->wq_nthreads, wq->wq_thidlecount);
1883 }
1884 
1885 #pragma mark thread state tracking
1886 
1887 static void
1888 workq_sched_callback(int type, thread_t thread)
1889 {
1890 	thread_ro_t tro = get_thread_ro(thread);
1891 	struct uthread *uth = get_bsdthread_info(thread);
1892 	struct workqueue *wq = proc_get_wqptr(tro->tro_proc);
1893 	thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket;
1894 	wq_thactive_t old_thactive;
1895 	bool start_timer = false;
1896 
1897 	if (qos == WORKQ_THREAD_QOS_MANAGER) {
1898 		return;
1899 	}
1900 
1901 	switch (type) {
1902 	case SCHED_CALL_BLOCK:
1903 		old_thactive = _wq_thactive_dec(wq, qos);
1904 		req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1905 
1906 		/*
1907 		 * Remember the timestamp of the last thread that blocked in this
1908 		 * bucket, it used used by admission checks to ignore one thread
1909 		 * being inactive if this timestamp is recent enough.
1910 		 *
1911 		 * If we collide with another thread trying to update the
1912 		 * last_blocked (really unlikely since another thread would have to
1913 		 * get scheduled and then block after we start down this path), it's
1914 		 * not a problem.  Either timestamp is adequate, so no need to retry
1915 		 */
1916 		os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)],
1917 		    thread_last_run_time(thread), relaxed);
1918 
1919 		if (req_qos == THREAD_QOS_UNSPECIFIED) {
1920 			/*
1921 			 * No pending request at the moment we could unblock, move on.
1922 			 */
1923 		} else if (qos < req_qos) {
1924 			/*
1925 			 * The blocking thread is at a lower QoS than the highest currently
1926 			 * pending constrained request, nothing has to be redriven
1927 			 */
1928 		} else {
1929 			uint32_t max_busycount, old_req_count;
1930 			old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
1931 			    req_qos, NULL, &max_busycount);
1932 			/*
1933 			 * If it is possible that may_start_constrained_thread had refused
1934 			 * admission due to being over the max concurrency, we may need to
1935 			 * spin up a new thread.
1936 			 *
1937 			 * We take into account the maximum number of busy threads
1938 			 * that can affect may_start_constrained_thread as looking at the
1939 			 * actual number may_start_constrained_thread will see is racy.
1940 			 *
1941 			 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1942 			 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1943 			 */
1944 			uint32_t conc = wq_max_parallelism[_wq_bucket(qos)];
1945 			if (old_req_count <= conc && conc <= old_req_count + max_busycount) {
1946 				start_timer = workq_schedule_delayed_thread_creation(wq, 0);
1947 			}
1948 		}
1949 		if (__improbable(kdebug_enable)) {
1950 			__unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1951 			    old_thactive, qos, NULL, NULL);
1952 			WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
1953 			    old - 1, qos | (req_qos << 8),
1954 			    wq->wq_reqcount << 1 | start_timer);
1955 		}
1956 		break;
1957 
1958 	case SCHED_CALL_UNBLOCK:
1959 		/*
1960 		 * we cannot take the workqueue_lock here...
1961 		 * an UNBLOCK can occur from a timer event which
1962 		 * is run from an interrupt context... if the workqueue_lock
1963 		 * is already held by this processor, we'll deadlock...
1964 		 * the thread lock for the thread being UNBLOCKED
1965 		 * is also held
1966 		 */
1967 		old_thactive = _wq_thactive_inc(wq, qos);
1968 		if (__improbable(kdebug_enable)) {
1969 			__unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1970 			    old_thactive, qos, NULL, NULL);
1971 			req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1972 			WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
1973 			    old + 1, qos | (req_qos << 8),
1974 			    wq->wq_threads_scheduled);
1975 		}
1976 		break;
1977 	}
1978 }
1979 
1980 #pragma mark workq lifecycle
1981 
1982 void
1983 workq_reference(struct workqueue *wq)
1984 {
1985 	os_ref_retain(&wq->wq_refcnt);
1986 }
1987 
1988 static void
1989 workq_deallocate_queue_invoke(mpsc_queue_chain_t e,
1990     __assert_only mpsc_daemon_queue_t dq)
1991 {
1992 	struct workqueue *wq;
1993 	struct turnstile *ts;
1994 
1995 	wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link);
1996 	assert(dq == &workq_deallocate_queue);
1997 
1998 	turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS);
1999 	assert(ts);
2000 	turnstile_cleanup();
2001 	turnstile_deallocate(ts);
2002 
2003 	lck_ticket_destroy(&wq->wq_lock, &workq_lck_grp);
2004 	zfree(workq_zone_workqueue, wq);
2005 }
2006 
2007 static void
2008 workq_deallocate(struct workqueue *wq)
2009 {
2010 	if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) {
2011 		workq_deallocate_queue_invoke(&wq->wq_destroy_link,
2012 		    &workq_deallocate_queue);
2013 	}
2014 }
2015 
2016 void
2017 workq_deallocate_safe(struct workqueue *wq)
2018 {
2019 	if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) {
2020 		mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link,
2021 		    MPSC_QUEUE_DISABLE_PREEMPTION);
2022 	}
2023 }
2024 
2025 /**
2026  * Setup per-process state for the workqueue.
2027  */
2028 int
2029 workq_open(struct proc *p, __unused struct workq_open_args *uap,
2030     __unused int32_t *retval)
2031 {
2032 	struct workqueue *wq;
2033 	int error = 0;
2034 
2035 	if ((p->p_lflag & P_LREGISTER) == 0) {
2036 		return EINVAL;
2037 	}
2038 
2039 	if (wq_init_constrained_limit) {
2040 		uint32_t limit, num_cpus = ml_wait_max_cpus();
2041 
2042 		/*
2043 		 * set up the limit for the constrained pool
2044 		 * this is a virtual pool in that we don't
2045 		 * maintain it on a separate idle and run list
2046 		 */
2047 		limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
2048 
2049 		if (limit > wq_max_constrained_threads) {
2050 			wq_max_constrained_threads = limit;
2051 		}
2052 
2053 		if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
2054 			wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
2055 		}
2056 		if (wq_max_threads > CONFIG_THREAD_MAX - 20) {
2057 			wq_max_threads = CONFIG_THREAD_MAX - 20;
2058 		}
2059 
2060 		wq_death_max_load = (uint16_t)fls(num_cpus) + 1;
2061 
2062 		for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
2063 			wq_max_parallelism[_wq_bucket(qos)] =
2064 			    qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL);
2065 		}
2066 
2067 		wq_max_cooperative_threads = num_cpus;
2068 
2069 		wq_init_constrained_limit = 0;
2070 	}
2071 
2072 	if (proc_get_wqptr(p) == NULL) {
2073 		if (proc_init_wqptr_or_wait(p) == FALSE) {
2074 			assert(proc_get_wqptr(p) != NULL);
2075 			goto out;
2076 		}
2077 
2078 		wq = zalloc_flags(workq_zone_workqueue, Z_WAITOK | Z_ZERO);
2079 
2080 		os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1);
2081 
2082 		// Start the event manager at the priority hinted at by the policy engine
2083 		thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task());
2084 		pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0);
2085 		wq->wq_event_manager_priority = (uint32_t)pp;
2086 		wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
2087 		wq->wq_proc = p;
2088 		turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(),
2089 		    TURNSTILE_WORKQS);
2090 
2091 		TAILQ_INIT(&wq->wq_thrunlist);
2092 		TAILQ_INIT(&wq->wq_thnewlist);
2093 		TAILQ_INIT(&wq->wq_thidlelist);
2094 		priority_queue_init(&wq->wq_overcommit_queue);
2095 		priority_queue_init(&wq->wq_constrained_queue);
2096 		priority_queue_init(&wq->wq_special_queue);
2097 		for (int bucket = 0; bucket < WORKQ_NUM_QOS_BUCKETS; bucket++) {
2098 			STAILQ_INIT(&wq->wq_cooperative_queue[bucket]);
2099 		}
2100 
2101 		/* We are only using the delayed thread call for the constrained pool
2102 		 * which can't have work at >= UI QoS and so we can be fine with a
2103 		 * UI QoS thread call.
2104 		 */
2105 		wq->wq_delayed_call = thread_call_allocate_with_qos(
2106 			workq_add_new_threads_call, p, THREAD_QOS_USER_INTERACTIVE,
2107 			THREAD_CALL_OPTIONS_ONCE);
2108 		wq->wq_immediate_call = thread_call_allocate_with_options(
2109 			workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
2110 			THREAD_CALL_OPTIONS_ONCE);
2111 		wq->wq_death_call = thread_call_allocate_with_options(
2112 			workq_kill_old_threads_call, wq,
2113 			THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2114 
2115 		lck_ticket_init(&wq->wq_lock, &workq_lck_grp);
2116 
2117 		WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq,
2118 		    VM_KERNEL_ADDRHIDE(wq), 0, 0);
2119 		proc_set_wqptr(p, wq);
2120 	}
2121 out:
2122 
2123 	return error;
2124 }
2125 
2126 /*
2127  * Routine:	workq_mark_exiting
2128  *
2129  * Function:	Mark the work queue such that new threads will not be added to the
2130  *		work queue after we return.
2131  *
2132  * Conditions:	Called against the current process.
2133  */
2134 void
2135 workq_mark_exiting(struct proc *p)
2136 {
2137 	struct workqueue *wq = proc_get_wqptr(p);
2138 	uint32_t wq_flags;
2139 	workq_threadreq_t mgr_req;
2140 
2141 	if (!wq) {
2142 		return;
2143 	}
2144 
2145 	WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0);
2146 
2147 	workq_lock_spin(wq);
2148 
2149 	wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed);
2150 	if (__improbable(wq_flags & WQ_EXITING)) {
2151 		panic("workq_mark_exiting called twice");
2152 	}
2153 
2154 	/*
2155 	 * Opportunistically try to cancel thread calls that are likely in flight.
2156 	 * workq_exit() will do the proper cleanup.
2157 	 */
2158 	if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) {
2159 		thread_call_cancel(wq->wq_immediate_call);
2160 	}
2161 	if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) {
2162 		thread_call_cancel(wq->wq_delayed_call);
2163 	}
2164 	if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2165 		thread_call_cancel(wq->wq_death_call);
2166 	}
2167 
2168 	mgr_req = wq->wq_event_manager_threadreq;
2169 	wq->wq_event_manager_threadreq = NULL;
2170 	wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */
2171 	wq->wq_creator = NULL;
2172 	workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
2173 
2174 	workq_unlock(wq);
2175 
2176 	if (mgr_req) {
2177 		kqueue_threadreq_cancel(p, mgr_req);
2178 	}
2179 	/*
2180 	 * No one touches the priority queues once WQ_EXITING is set.
2181 	 * It is hence safe to do the tear down without holding any lock.
2182 	 */
2183 	priority_queue_destroy(&wq->wq_overcommit_queue,
2184 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2185 		workq_threadreq_destroy(p, e);
2186 	});
2187 	priority_queue_destroy(&wq->wq_constrained_queue,
2188 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2189 		workq_threadreq_destroy(p, e);
2190 	});
2191 	priority_queue_destroy(&wq->wq_special_queue,
2192 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2193 		workq_threadreq_destroy(p, e);
2194 	});
2195 
2196 	WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0);
2197 }
2198 
2199 /*
2200  * Routine:	workq_exit
2201  *
2202  * Function:	clean up the work queue structure(s) now that there are no threads
2203  *		left running inside the work queue (except possibly current_thread).
2204  *
2205  * Conditions:	Called by the last thread in the process.
2206  *		Called against current process.
2207  */
2208 void
2209 workq_exit(struct proc *p)
2210 {
2211 	struct workqueue *wq;
2212 	struct uthread *uth, *tmp;
2213 
2214 	wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed);
2215 	if (wq != NULL) {
2216 		thread_t th = current_thread();
2217 
2218 		WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0);
2219 
2220 		if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) {
2221 			/*
2222 			 * <rdar://problem/40111515> Make sure we will no longer call the
2223 			 * sched call, if we ever block this thread, which the cancel_wait
2224 			 * below can do.
2225 			 */
2226 			thread_sched_call(th, NULL);
2227 		}
2228 
2229 		/*
2230 		 * Thread calls are always scheduled by the proc itself or under the
2231 		 * workqueue spinlock if WQ_EXITING is not yet set.
2232 		 *
2233 		 * Either way, when this runs, the proc has no threads left beside
2234 		 * the one running this very code, so we know no thread call can be
2235 		 * dispatched anymore.
2236 		 */
2237 		thread_call_cancel_wait(wq->wq_delayed_call);
2238 		thread_call_cancel_wait(wq->wq_immediate_call);
2239 		thread_call_cancel_wait(wq->wq_death_call);
2240 		thread_call_free(wq->wq_delayed_call);
2241 		thread_call_free(wq->wq_immediate_call);
2242 		thread_call_free(wq->wq_death_call);
2243 
2244 		/*
2245 		 * Clean up workqueue data structures for threads that exited and
2246 		 * didn't get a chance to clean up after themselves.
2247 		 *
2248 		 * idle/new threads should have been interrupted and died on their own
2249 		 */
2250 		TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) {
2251 			thread_t mth = get_machthread(uth);
2252 			thread_sched_call(mth, NULL);
2253 			thread_deallocate(mth);
2254 		}
2255 		assert(TAILQ_EMPTY(&wq->wq_thnewlist));
2256 		assert(TAILQ_EMPTY(&wq->wq_thidlelist));
2257 
2258 		WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq,
2259 		    VM_KERNEL_ADDRHIDE(wq), 0, 0);
2260 
2261 		workq_deallocate(wq);
2262 
2263 		WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0);
2264 	}
2265 }
2266 
2267 
2268 #pragma mark bsd thread control
2269 
2270 bool
2271 bsdthread_part_of_cooperative_workqueue(struct uthread *uth)
2272 {
2273 	return (workq_thread_is_cooperative(uth) || workq_thread_is_nonovercommit(uth)) &&
2274 	       (uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER);
2275 }
2276 
2277 static bool
2278 _pthread_priority_to_policy(pthread_priority_t priority,
2279     thread_qos_policy_data_t *data)
2280 {
2281 	data->qos_tier = _pthread_priority_thread_qos(priority);
2282 	data->tier_importance = _pthread_priority_relpri(priority);
2283 	if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 ||
2284 	    data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
2285 		return false;
2286 	}
2287 	return true;
2288 }
2289 
2290 static int
2291 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority,
2292     mach_port_name_t voucher, enum workq_set_self_flags flags)
2293 {
2294 	struct uthread *uth = get_bsdthread_info(th);
2295 	struct workqueue *wq = proc_get_wqptr(p);
2296 
2297 	kern_return_t kr;
2298 	int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
2299 	bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE);
2300 
2301 	assert(th == current_thread());
2302 	if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) {
2303 		if (!is_wq_thread) {
2304 			unbind_rv = EINVAL;
2305 			goto qos;
2306 		}
2307 
2308 		if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
2309 			unbind_rv = EINVAL;
2310 			goto qos;
2311 		}
2312 
2313 		workq_threadreq_t kqr = uth->uu_kqr_bound;
2314 		if (kqr == NULL) {
2315 			unbind_rv = EALREADY;
2316 			goto qos;
2317 		}
2318 
2319 		if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2320 			unbind_rv = EINVAL;
2321 			goto qos;
2322 		}
2323 
2324 		kqueue_threadreq_unbind(p, kqr);
2325 	}
2326 
2327 qos:
2328 	if (flags & (WORKQ_SET_SELF_QOS_FLAG | WORKQ_SET_SELF_QOS_OVERRIDE_FLAG)) {
2329 		assert(flags & WORKQ_SET_SELF_QOS_FLAG);
2330 
2331 		thread_qos_policy_data_t new_policy;
2332 		thread_qos_t qos_override = THREAD_QOS_UNSPECIFIED;
2333 
2334 		if (!_pthread_priority_to_policy(priority, &new_policy)) {
2335 			qos_rv = EINVAL;
2336 			goto voucher;
2337 		}
2338 
2339 		if (flags & WORKQ_SET_SELF_QOS_OVERRIDE_FLAG) {
2340 			/*
2341 			 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is set, we definitely
2342 			 * should have an override QoS in the pthread_priority_t and we should
2343 			 * only come into this path for cooperative thread requests
2344 			 */
2345 			if (!_pthread_priority_has_override_qos(priority) ||
2346 			    !_pthread_priority_is_cooperative(priority)) {
2347 				qos_rv = EINVAL;
2348 				goto voucher;
2349 			}
2350 			qos_override = _pthread_priority_thread_override_qos(priority);
2351 		} else {
2352 			/*
2353 			 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is not set, we definitely
2354 			 * should not have an override QoS in the pthread_priority_t
2355 			 */
2356 			if (_pthread_priority_has_override_qos(priority)) {
2357 				qos_rv = EINVAL;
2358 				goto voucher;
2359 			}
2360 		}
2361 
2362 		if (!is_wq_thread) {
2363 			/*
2364 			 * Threads opted out of QoS can't change QoS
2365 			 */
2366 			if (!thread_has_qos_policy(th)) {
2367 				qos_rv = EPERM;
2368 				goto voucher;
2369 			}
2370 		} else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER ||
2371 		    uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) {
2372 			/*
2373 			 * Workqueue manager threads or threads above UI can't change QoS
2374 			 */
2375 			qos_rv = EINVAL;
2376 			goto voucher;
2377 		} else {
2378 			/*
2379 			 * For workqueue threads, possibly adjust buckets and redrive thread
2380 			 * requests.
2381 			 *
2382 			 * Transitions allowed:
2383 			 *
2384 			 * overcommit --> non-overcommit
2385 			 * overcommit --> overcommit
2386 			 * non-overcommit --> non-overcommit
2387 			 * non-overcommit --> overcommit (to be deprecated later)
2388 			 * cooperative --> cooperative
2389 			 *
2390 			 * All other transitions aren't allowed so reject them.
2391 			 */
2392 			if (workq_thread_is_overcommit(uth) && _pthread_priority_is_cooperative(priority)) {
2393 				qos_rv = EINVAL;
2394 				goto voucher;
2395 			} else if (workq_thread_is_cooperative(uth) && !_pthread_priority_is_cooperative(priority)) {
2396 				qos_rv = EINVAL;
2397 				goto voucher;
2398 			} else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_cooperative(priority)) {
2399 				qos_rv = EINVAL;
2400 				goto voucher;
2401 			}
2402 
2403 			struct uu_workq_policy old_pri, new_pri;
2404 			bool force_run = false;
2405 
2406 			if (qos_override) {
2407 				/*
2408 				 * We're in the case of a thread clarifying that it is for eg. not IN
2409 				 * req QoS but rather, UT req QoS with IN override. However, this can
2410 				 * race with a concurrent override happening to the thread via
2411 				 * workq_thread_add_dispatch_override so this needs to be
2412 				 * synchronized with the thread mutex.
2413 				 */
2414 				thread_mtx_lock(th);
2415 			}
2416 
2417 			workq_lock_spin(wq);
2418 
2419 			old_pri = new_pri = uth->uu_workq_pri;
2420 			new_pri.qos_req = (thread_qos_t)new_policy.qos_tier;
2421 
2422 			if (old_pri.qos_override < qos_override) {
2423 				/*
2424 				 * Since this can race with a concurrent override via
2425 				 * workq_thread_add_dispatch_override, only adjust override value if we
2426 				 * are higher - this is a saturating function.
2427 				 *
2428 				 * We should not be changing the final override values, we should simply
2429 				 * be redistributing the current value with a different breakdown of req
2430 				 * vs override QoS - assert to that effect. Therefore, buckets should
2431 				 * not change.
2432 				 */
2433 				new_pri.qos_override = qos_override;
2434 				assert(workq_pri_override(new_pri) == workq_pri_override(old_pri));
2435 				assert(workq_pri_bucket(new_pri) == workq_pri_bucket(old_pri));
2436 			}
2437 
2438 			/* Adjust schedule counts for various types of transitions */
2439 
2440 			/* overcommit -> non-overcommit */
2441 			if (workq_thread_is_overcommit(uth) && _pthread_priority_is_nonovercommit(priority)) {
2442 				workq_thread_set_type(uth, 0);
2443 				wq->wq_constrained_threads_scheduled++;
2444 
2445 				/* non-overcommit -> overcommit */
2446 			} else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_overcommit(priority)) {
2447 				workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
2448 				force_run = (wq->wq_constrained_threads_scheduled-- == wq_max_constrained_threads);
2449 
2450 				/* cooperative -> cooperative */
2451 			} else if (workq_thread_is_cooperative(uth)) {
2452 				_wq_cooperative_queue_scheduled_count_dec(wq, old_pri.qos_req);
2453 				_wq_cooperative_queue_scheduled_count_inc(wq, new_pri.qos_req);
2454 
2455 				/* We're changing schedule counts within cooperative pool, we
2456 				 * need to refresh best cooperative QoS logic again */
2457 				force_run = _wq_cooperative_queue_refresh_best_req_qos(wq);
2458 			}
2459 
2460 			/*
2461 			 * This will set up an override on the thread if any and will also call
2462 			 * schedule_creator if needed
2463 			 */
2464 			workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run);
2465 			workq_unlock(wq);
2466 
2467 			if (qos_override) {
2468 				thread_mtx_unlock(th);
2469 			}
2470 
2471 			if (workq_thread_is_overcommit(uth)) {
2472 				thread_disarm_workqueue_quantum(th);
2473 			} else {
2474 				/* If the thread changed QoS buckets, the quantum duration
2475 				 * may have changed too */
2476 				thread_arm_workqueue_quantum(th);
2477 			}
2478 		}
2479 
2480 		kr = thread_policy_set_internal(th, THREAD_QOS_POLICY,
2481 		    (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT);
2482 		if (kr != KERN_SUCCESS) {
2483 			qos_rv = EINVAL;
2484 		}
2485 	}
2486 
2487 voucher:
2488 	if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) {
2489 		kr = thread_set_voucher_name(voucher);
2490 		if (kr != KERN_SUCCESS) {
2491 			voucher_rv = ENOENT;
2492 			goto fixedpri;
2493 		}
2494 	}
2495 
2496 fixedpri:
2497 	if (qos_rv) {
2498 		goto done;
2499 	}
2500 	if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) {
2501 		thread_extended_policy_data_t extpol = {.timeshare = 0};
2502 
2503 		if (is_wq_thread) {
2504 			/* Not allowed on workqueue threads */
2505 			fixedpri_rv = ENOTSUP;
2506 			goto done;
2507 		}
2508 
2509 		kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2510 		    (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2511 		if (kr != KERN_SUCCESS) {
2512 			fixedpri_rv = EINVAL;
2513 			goto done;
2514 		}
2515 	} else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) {
2516 		thread_extended_policy_data_t extpol = {.timeshare = 1};
2517 
2518 		if (is_wq_thread) {
2519 			/* Not allowed on workqueue threads */
2520 			fixedpri_rv = ENOTSUP;
2521 			goto done;
2522 		}
2523 
2524 		kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2525 		    (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2526 		if (kr != KERN_SUCCESS) {
2527 			fixedpri_rv = EINVAL;
2528 			goto done;
2529 		}
2530 	}
2531 
2532 done:
2533 	if (qos_rv && voucher_rv) {
2534 		/* Both failed, give that a unique error. */
2535 		return EBADMSG;
2536 	}
2537 
2538 	if (unbind_rv) {
2539 		return unbind_rv;
2540 	}
2541 
2542 	if (qos_rv) {
2543 		return qos_rv;
2544 	}
2545 
2546 	if (voucher_rv) {
2547 		return voucher_rv;
2548 	}
2549 
2550 	if (fixedpri_rv) {
2551 		return fixedpri_rv;
2552 	}
2553 
2554 
2555 	return 0;
2556 }
2557 
2558 static int
2559 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport,
2560     pthread_priority_t pp, user_addr_t resource)
2561 {
2562 	thread_qos_t qos = _pthread_priority_thread_qos(pp);
2563 	if (qos == THREAD_QOS_UNSPECIFIED) {
2564 		return EINVAL;
2565 	}
2566 
2567 	thread_t th = port_name_to_thread(kport,
2568 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2569 	if (th == THREAD_NULL) {
2570 		return ESRCH;
2571 	}
2572 
2573 	int rv = proc_thread_qos_add_override(proc_task(p), th, 0, qos, TRUE,
2574 	    resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2575 
2576 	thread_deallocate(th);
2577 	return rv;
2578 }
2579 
2580 static int
2581 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport,
2582     user_addr_t resource)
2583 {
2584 	thread_t th = port_name_to_thread(kport,
2585 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2586 	if (th == THREAD_NULL) {
2587 		return ESRCH;
2588 	}
2589 
2590 	int rv = proc_thread_qos_remove_override(proc_task(p), th, 0, resource,
2591 	    THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2592 
2593 	thread_deallocate(th);
2594 	return rv;
2595 }
2596 
2597 static int
2598 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport,
2599     pthread_priority_t pp, user_addr_t ulock_addr)
2600 {
2601 	struct uu_workq_policy old_pri, new_pri;
2602 	struct workqueue *wq = proc_get_wqptr(p);
2603 
2604 	thread_qos_t qos_override = _pthread_priority_thread_qos(pp);
2605 	if (qos_override == THREAD_QOS_UNSPECIFIED) {
2606 		return EINVAL;
2607 	}
2608 
2609 	thread_t thread = port_name_to_thread(kport,
2610 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2611 	if (thread == THREAD_NULL) {
2612 		return ESRCH;
2613 	}
2614 
2615 	struct uthread *uth = get_bsdthread_info(thread);
2616 	if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2617 		thread_deallocate(thread);
2618 		return EPERM;
2619 	}
2620 
2621 	WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE,
2622 	    wq, thread_tid(thread), 1, pp);
2623 
2624 	thread_mtx_lock(thread);
2625 
2626 	if (ulock_addr) {
2627 		uint32_t val;
2628 		int rc;
2629 		/*
2630 		 * Workaround lack of explicit support for 'no-fault copyin'
2631 		 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2632 		 */
2633 		disable_preemption();
2634 		rc = copyin_atomic32(ulock_addr, &val);
2635 		enable_preemption();
2636 		if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) {
2637 			goto out;
2638 		}
2639 	}
2640 
2641 	workq_lock_spin(wq);
2642 
2643 	old_pri = uth->uu_workq_pri;
2644 	if (old_pri.qos_override >= qos_override) {
2645 		/* Nothing to do */
2646 	} else if (thread == current_thread()) {
2647 		new_pri = old_pri;
2648 		new_pri.qos_override = qos_override;
2649 		workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2650 	} else {
2651 		uth->uu_workq_pri.qos_override = qos_override;
2652 		if (qos_override > workq_pri_override(old_pri)) {
2653 			thread_set_workq_override(thread, qos_override);
2654 		}
2655 	}
2656 
2657 	workq_unlock(wq);
2658 
2659 out:
2660 	thread_mtx_unlock(thread);
2661 	thread_deallocate(thread);
2662 	return 0;
2663 }
2664 
2665 static int
2666 workq_thread_reset_dispatch_override(proc_t p, thread_t thread)
2667 {
2668 	struct uu_workq_policy old_pri, new_pri;
2669 	struct workqueue *wq = proc_get_wqptr(p);
2670 	struct uthread *uth = get_bsdthread_info(thread);
2671 
2672 	if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2673 		return EPERM;
2674 	}
2675 
2676 	WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0);
2677 
2678 	/*
2679 	 * workq_thread_add_dispatch_override takes the thread mutex before doing the
2680 	 * copyin to validate the drainer and apply the override. We need to do the
2681 	 * same here. See rdar://84472518
2682 	 */
2683 	thread_mtx_lock(thread);
2684 
2685 	workq_lock_spin(wq);
2686 	old_pri = new_pri = uth->uu_workq_pri;
2687 	new_pri.qos_override = THREAD_QOS_UNSPECIFIED;
2688 	workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2689 	workq_unlock(wq);
2690 
2691 	thread_mtx_unlock(thread);
2692 	return 0;
2693 }
2694 
2695 static int
2696 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable)
2697 {
2698 	if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) {
2699 		// If the thread isn't a workqueue thread, don't set the
2700 		// kill_allowed bit; however, we still need to return 0
2701 		// instead of an error code since this code is executed
2702 		// on the abort path which needs to not depend on the
2703 		// pthread_t (returning an error depends on pthread_t via
2704 		// cerror_nocancel)
2705 		return 0;
2706 	}
2707 	struct uthread *uth = get_bsdthread_info(thread);
2708 	uth->uu_workq_pthread_kill_allowed = enable;
2709 	return 0;
2710 }
2711 
2712 static int
2713 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags,
2714     int *retval)
2715 {
2716 	static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
2717 	    _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
2718 	static_assert(QOS_PARALLELISM_REALTIME ==
2719 	    _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
2720 	static_assert(QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE ==
2721 	    _PTHREAD_QOS_PARALLELISM_CLUSTER_SHARED_RSRC, "cluster shared resource");
2722 
2723 	if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL | QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE)) {
2724 		return EINVAL;
2725 	}
2726 
2727 	/* No units are present */
2728 	if (flags & QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE) {
2729 		return ENOTSUP;
2730 	}
2731 
2732 	if (flags & QOS_PARALLELISM_REALTIME) {
2733 		if (qos) {
2734 			return EINVAL;
2735 		}
2736 	} else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
2737 		return EINVAL;
2738 	}
2739 
2740 	*retval = qos_max_parallelism(qos, flags);
2741 	return 0;
2742 }
2743 
2744 static int
2745 bsdthread_dispatch_apply_attr(__unused struct proc *p, thread_t thread,
2746     unsigned long flags, uint64_t value1, __unused uint64_t value2)
2747 {
2748 	uint32_t apply_worker_index;
2749 	kern_return_t kr;
2750 
2751 	switch (flags) {
2752 	case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_SET:
2753 		apply_worker_index = (uint32_t)value1;
2754 		kr = thread_shared_rsrc_policy_set(thread, apply_worker_index, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2755 		/*
2756 		 * KERN_INVALID_POLICY indicates that the thread was trying to bind to a
2757 		 * cluster which it was not eligible to execute on.
2758 		 */
2759 		return (kr == KERN_SUCCESS) ? 0 : ((kr == KERN_INVALID_POLICY) ? ENOTSUP : EINVAL);
2760 	case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_CLEAR:
2761 		kr = thread_shared_rsrc_policy_clear(thread, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2762 		return (kr == KERN_SUCCESS) ? 0 : EINVAL;
2763 	default:
2764 		return EINVAL;
2765 	}
2766 }
2767 
2768 #define ENSURE_UNUSED(arg) \
2769 	        ({ if ((arg) != 0) { return EINVAL; } })
2770 
2771 int
2772 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval)
2773 {
2774 	switch (uap->cmd) {
2775 	case BSDTHREAD_CTL_QOS_OVERRIDE_START:
2776 		return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1,
2777 		           (pthread_priority_t)uap->arg2, uap->arg3);
2778 	case BSDTHREAD_CTL_QOS_OVERRIDE_END:
2779 		ENSURE_UNUSED(uap->arg3);
2780 		return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1,
2781 		           (user_addr_t)uap->arg2);
2782 
2783 	case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
2784 		return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1,
2785 		           (pthread_priority_t)uap->arg2, uap->arg3);
2786 	case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
2787 		return workq_thread_reset_dispatch_override(p, current_thread());
2788 
2789 	case BSDTHREAD_CTL_SET_SELF:
2790 		return bsdthread_set_self(p, current_thread(),
2791 		           (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2,
2792 		           (enum workq_set_self_flags)uap->arg3);
2793 
2794 	case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
2795 		ENSURE_UNUSED(uap->arg3);
2796 		return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1,
2797 		           (unsigned long)uap->arg2, retval);
2798 	case BSDTHREAD_CTL_WORKQ_ALLOW_KILL:
2799 		ENSURE_UNUSED(uap->arg2);
2800 		ENSURE_UNUSED(uap->arg3);
2801 		return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1);
2802 	case BSDTHREAD_CTL_DISPATCH_APPLY_ATTR:
2803 		return bsdthread_dispatch_apply_attr(p, current_thread(),
2804 		           (unsigned long)uap->arg1, (uint64_t)uap->arg2,
2805 		           (uint64_t)uap->arg3);
2806 	case BSDTHREAD_CTL_SET_QOS:
2807 	case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
2808 	case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
2809 		/* no longer supported */
2810 		return ENOTSUP;
2811 
2812 	default:
2813 		return EINVAL;
2814 	}
2815 }
2816 
2817 #pragma mark workqueue thread manipulation
2818 
2819 static void __dead2
2820 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2821     struct uthread *uth, uint32_t setup_flags);
2822 
2823 static void __dead2
2824 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2825     struct uthread *uth, uint32_t setup_flags);
2826 
2827 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2;
2828 
2829 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2830 static inline uint64_t
2831 workq_trace_req_id(workq_threadreq_t req)
2832 {
2833 	struct kqworkloop *kqwl;
2834 	if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2835 		kqwl = __container_of(req, struct kqworkloop, kqwl_request);
2836 		return kqwl->kqwl_dynamicid;
2837 	}
2838 
2839 	return VM_KERNEL_ADDRHIDE(req);
2840 }
2841 #endif
2842 
2843 /**
2844  * Entry point for libdispatch to ask for threads
2845  */
2846 static int
2847 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp, bool cooperative)
2848 {
2849 	thread_qos_t qos = _pthread_priority_thread_qos(pp);
2850 	struct workqueue *wq = proc_get_wqptr(p);
2851 	uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;
2852 	int ret = 0;
2853 
2854 	if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
2855 	    qos == THREAD_QOS_UNSPECIFIED) {
2856 		ret = EINVAL;
2857 		goto exit;
2858 	}
2859 
2860 	WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
2861 	    wq, reqcount, pp, cooperative);
2862 
2863 	workq_threadreq_t req = zalloc(workq_zone_threadreq);
2864 	priority_queue_entry_init(&req->tr_entry);
2865 	req->tr_state = WORKQ_TR_STATE_NEW;
2866 	req->tr_qos   = qos;
2867 	workq_tr_flags_t tr_flags = 0;
2868 
2869 	if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
2870 		tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT;
2871 		upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2872 	}
2873 
2874 	if (cooperative) {
2875 		tr_flags |= WORKQ_TR_FLAG_COOPERATIVE;
2876 		upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
2877 
2878 		if (reqcount > 1) {
2879 			ret = ENOTSUP;
2880 			goto free_and_exit;
2881 		}
2882 	}
2883 
2884 	/* A thread request cannot be both overcommit and cooperative */
2885 	if (workq_tr_is_cooperative(tr_flags) &&
2886 	    workq_tr_is_overcommit(tr_flags)) {
2887 		ret = EINVAL;
2888 		goto free_and_exit;
2889 	}
2890 	req->tr_flags = tr_flags;
2891 
2892 	WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
2893 	    wq, workq_trace_req_id(req), req->tr_qos, reqcount);
2894 
2895 	workq_lock_spin(wq);
2896 	do {
2897 		if (_wq_exiting(wq)) {
2898 			goto unlock_and_exit;
2899 		}
2900 
2901 		/*
2902 		 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2903 		 * threads without pacing, to inform the scheduler of that workload.
2904 		 *
2905 		 * The last requests, or the ones that failed the admission checks are
2906 		 * enqueued and go through the regular creator codepath.
2907 		 *
2908 		 * If there aren't enough threads, add one, but re-evaluate everything
2909 		 * as conditions may now have changed.
2910 		 */
2911 		unpaced = reqcount - 1;
2912 
2913 		if (reqcount > 1) {
2914 			/* We don't handle asking for parallelism on the cooperative
2915 			 * workqueue just yet */
2916 			assert(!workq_threadreq_is_cooperative(req));
2917 
2918 			if (workq_threadreq_is_nonovercommit(req)) {
2919 				unpaced = workq_constrained_allowance(wq, qos, NULL, false);
2920 				if (unpaced >= reqcount - 1) {
2921 					unpaced = reqcount - 1;
2922 				}
2923 			}
2924 		}
2925 
2926 		/*
2927 		 * This path does not currently handle custom workloop parameters
2928 		 * when creating threads for parallelism.
2929 		 */
2930 		assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS));
2931 
2932 		/*
2933 		 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2934 		 */
2935 		while (unpaced > 0 && wq->wq_thidlecount) {
2936 			struct uthread *uth;
2937 			bool needs_wakeup;
2938 			uint8_t uu_flags = UT_WORKQ_EARLY_BOUND;
2939 
2940 			if (workq_tr_is_overcommit(req->tr_flags)) {
2941 				uu_flags |= UT_WORKQ_OVERCOMMIT;
2942 			}
2943 
2944 			uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup);
2945 
2946 			_wq_thactive_inc(wq, qos);
2947 			wq->wq_thscheduled_count[_wq_bucket(qos)]++;
2948 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
2949 			wq->wq_fulfilled++;
2950 
2951 			uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
2952 			uth->uu_save.uus_workq_park_data.thread_request = req;
2953 			if (needs_wakeup) {
2954 				workq_thread_wakeup(uth);
2955 			}
2956 			unpaced--;
2957 			reqcount--;
2958 		}
2959 	} while (unpaced && wq->wq_nthreads < wq_max_threads &&
2960 	    workq_add_new_idle_thread(p, wq));
2961 
2962 	if (_wq_exiting(wq)) {
2963 		goto unlock_and_exit;
2964 	}
2965 
2966 	req->tr_count = (uint16_t)reqcount;
2967 	if (workq_threadreq_enqueue(wq, req)) {
2968 		/* This can drop the workqueue lock, and take it again */
2969 		workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
2970 	}
2971 	workq_unlock(wq);
2972 	return 0;
2973 
2974 unlock_and_exit:
2975 	workq_unlock(wq);
2976 free_and_exit:
2977 	zfree(workq_zone_threadreq, req);
2978 exit:
2979 	return ret;
2980 }
2981 
2982 bool
2983 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req,
2984     struct turnstile *workloop_ts, thread_qos_t qos,
2985     workq_kern_threadreq_flags_t flags)
2986 {
2987 	struct workqueue *wq = proc_get_wqptr_fast(p);
2988 	struct uthread *uth = NULL;
2989 
2990 	assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT));
2991 
2992 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
2993 		workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
2994 		qos = thread_workq_qos_for_pri(trp.trp_pri);
2995 		if (qos == THREAD_QOS_UNSPECIFIED) {
2996 			qos = WORKQ_THREAD_QOS_ABOVEUI;
2997 		}
2998 	}
2999 
3000 	assert(req->tr_state == WORKQ_TR_STATE_IDLE);
3001 	priority_queue_entry_init(&req->tr_entry);
3002 	req->tr_count = 1;
3003 	req->tr_state = WORKQ_TR_STATE_NEW;
3004 	req->tr_qos   = qos;
3005 
3006 	WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq,
3007 	    workq_trace_req_id(req), qos, 1);
3008 
3009 	if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) {
3010 		/*
3011 		 * we're called back synchronously from the context of
3012 		 * kqueue_threadreq_unbind from within workq_thread_return()
3013 		 * we can try to match up this thread with this request !
3014 		 */
3015 		uth = current_uthread();
3016 		assert(uth->uu_kqr_bound == NULL);
3017 	}
3018 
3019 	workq_lock_spin(wq);
3020 	if (_wq_exiting(wq)) {
3021 		req->tr_state = WORKQ_TR_STATE_IDLE;
3022 		workq_unlock(wq);
3023 		return false;
3024 	}
3025 
3026 	if (uth && workq_threadreq_admissible(wq, uth, req)) {
3027 		/* This is the case of the rebind - we were about to park and unbind
3028 		 * when more events came so keep the binding.
3029 		 */
3030 		assert(uth != wq->wq_creator);
3031 
3032 		if (uth->uu_workq_pri.qos_bucket != req->tr_qos) {
3033 			_wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos);
3034 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ false);
3035 		}
3036 		/*
3037 		 * We're called from workq_kern_threadreq_initiate()
3038 		 * due to an unbind, with the kq req held.
3039 		 */
3040 		WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
3041 		    workq_trace_req_id(req), req->tr_flags, 0);
3042 		wq->wq_fulfilled++;
3043 
3044 		kqueue_threadreq_bind(p, req, get_machthread(uth), 0);
3045 	} else {
3046 		if (workloop_ts) {
3047 			workq_perform_turnstile_operation_locked(wq, ^{
3048 				turnstile_update_inheritor(workloop_ts, wq->wq_turnstile,
3049 				TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
3050 				turnstile_update_inheritor_complete(workloop_ts,
3051 				TURNSTILE_INTERLOCK_HELD);
3052 			});
3053 		}
3054 
3055 		bool reevaluate_creator_thread_group = false;
3056 #if CONFIG_PREADOPT_TG
3057 		reevaluate_creator_thread_group = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3058 #endif
3059 		/* We enqueued the highest priority item or we may need to reevaluate if
3060 		 * the creator needs a thread group pre-adoption */
3061 		if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_thread_group) {
3062 			workq_schedule_creator(p, wq, flags);
3063 		}
3064 	}
3065 
3066 	workq_unlock(wq);
3067 
3068 	return true;
3069 }
3070 
3071 void
3072 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req,
3073     thread_qos_t qos, workq_kern_threadreq_flags_t flags)
3074 {
3075 	struct workqueue *wq = proc_get_wqptr_fast(p);
3076 	bool make_overcommit = false;
3077 
3078 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
3079 		/* Requests outside-of-QoS shouldn't accept modify operations */
3080 		return;
3081 	}
3082 
3083 	workq_lock_spin(wq);
3084 
3085 	assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3086 	assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP));
3087 
3088 	if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3089 		kqueue_threadreq_bind(p, req, req->tr_thread, 0);
3090 		workq_unlock(wq);
3091 		return;
3092 	}
3093 
3094 	if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) {
3095 		/* TODO (rokhinip): We come into this code path for kqwl thread
3096 		 * requests. kqwl requests cannot be cooperative.
3097 		 */
3098 		assert(!workq_threadreq_is_cooperative(req));
3099 
3100 		make_overcommit = workq_threadreq_is_nonovercommit(req);
3101 	}
3102 
3103 	if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) {
3104 		workq_unlock(wq);
3105 		return;
3106 	}
3107 
3108 	assert(req->tr_count == 1);
3109 	if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3110 		panic("Invalid thread request (%p) state %d", req, req->tr_state);
3111 	}
3112 
3113 	WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq,
3114 	    workq_trace_req_id(req), qos, 0);
3115 
3116 	struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req);
3117 	workq_threadreq_t req_max;
3118 
3119 	/*
3120 	 * Stage 1: Dequeue the request from its priority queue.
3121 	 *
3122 	 * If we dequeue the root item of the constrained priority queue,
3123 	 * maintain the best constrained request qos invariant.
3124 	 */
3125 	if (priority_queue_remove(pq, &req->tr_entry)) {
3126 		if (workq_threadreq_is_nonovercommit(req)) {
3127 			_wq_thactive_refresh_best_constrained_req_qos(wq);
3128 		}
3129 	}
3130 
3131 	/*
3132 	 * Stage 2: Apply changes to the thread request
3133 	 *
3134 	 * If the item will not become the root of the priority queue it belongs to,
3135 	 * then we need to wait in line, just enqueue and return quickly.
3136 	 */
3137 	if (__improbable(make_overcommit)) {
3138 		req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT;
3139 		pq = workq_priority_queue_for_req(wq, req);
3140 	}
3141 	req->tr_qos = qos;
3142 
3143 	req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry);
3144 	if (req_max && req_max->tr_qos >= qos) {
3145 		priority_queue_entry_set_sched_pri(pq, &req->tr_entry,
3146 		    workq_priority_for_req(req), false);
3147 		priority_queue_insert(pq, &req->tr_entry);
3148 		workq_unlock(wq);
3149 		return;
3150 	}
3151 
3152 	/*
3153 	 * Stage 3: Reevaluate whether we should run the thread request.
3154 	 *
3155 	 * Pretend the thread request is new again:
3156 	 * - adjust wq_reqcount to not count it anymore.
3157 	 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
3158 	 *   properly attempts a synchronous bind)
3159 	 */
3160 	wq->wq_reqcount--;
3161 	req->tr_state = WORKQ_TR_STATE_NEW;
3162 
3163 	/* We enqueued the highest priority item or we may need to reevaluate if
3164 	 * the creator needs a thread group pre-adoption if the request got a new TG */
3165 	bool reevaluate_creator_tg = false;
3166 
3167 #if CONFIG_PREADOPT_TG
3168 	reevaluate_creator_tg = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3169 #endif
3170 
3171 	if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_tg) {
3172 		workq_schedule_creator(p, wq, flags);
3173 	}
3174 	workq_unlock(wq);
3175 }
3176 
3177 void
3178 workq_kern_threadreq_lock(struct proc *p)
3179 {
3180 	workq_lock_spin(proc_get_wqptr_fast(p));
3181 }
3182 
3183 void
3184 workq_kern_threadreq_unlock(struct proc *p)
3185 {
3186 	workq_unlock(proc_get_wqptr_fast(p));
3187 }
3188 
3189 void
3190 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req,
3191     thread_t owner, struct turnstile *wl_ts,
3192     turnstile_update_flags_t flags)
3193 {
3194 	struct workqueue *wq = proc_get_wqptr_fast(p);
3195 	turnstile_inheritor_t inheritor;
3196 
3197 	assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3198 	assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP);
3199 	workq_lock_held(wq);
3200 
3201 	if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3202 		kqueue_threadreq_bind(p, req, req->tr_thread,
3203 		    KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE);
3204 		return;
3205 	}
3206 
3207 	if (_wq_exiting(wq)) {
3208 		inheritor = TURNSTILE_INHERITOR_NULL;
3209 	} else {
3210 		if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3211 			panic("Invalid thread request (%p) state %d", req, req->tr_state);
3212 		}
3213 
3214 		if (owner) {
3215 			inheritor = owner;
3216 			flags |= TURNSTILE_INHERITOR_THREAD;
3217 		} else {
3218 			inheritor = wq->wq_turnstile;
3219 			flags |= TURNSTILE_INHERITOR_TURNSTILE;
3220 		}
3221 	}
3222 
3223 	workq_perform_turnstile_operation_locked(wq, ^{
3224 		turnstile_update_inheritor(wl_ts, inheritor, flags);
3225 	});
3226 }
3227 
3228 void
3229 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags)
3230 {
3231 	struct workqueue *wq = proc_get_wqptr_fast(p);
3232 
3233 	workq_lock_spin(wq);
3234 	workq_schedule_creator(p, wq, flags);
3235 	workq_unlock(wq);
3236 }
3237 
3238 /*
3239  * Always called at AST by the thread on itself
3240  *
3241  * Upon quantum expiry, the workqueue subsystem evaluates its state and decides
3242  * on what the thread should do next. The TSD value is always set by the thread
3243  * on itself in the kernel and cleared either by userspace when it acks the TSD
3244  * value and takes action, or by the thread in the kernel when the quantum
3245  * expires again.
3246  */
3247 void
3248 workq_kern_quantum_expiry_reevaluate(proc_t proc, thread_t thread)
3249 {
3250 	struct uthread *uth = get_bsdthread_info(thread);
3251 
3252 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3253 		return;
3254 	}
3255 
3256 	if (!thread_supports_cooperative_workqueue(thread)) {
3257 		panic("Quantum expired for thread that doesn't support cooperative workqueue");
3258 	}
3259 
3260 	thread_qos_t qos = uth->uu_workq_pri.qos_bucket;
3261 	if (qos == THREAD_QOS_UNSPECIFIED) {
3262 		panic("Thread should not have workq bucket of QoS UN");
3263 	}
3264 
3265 	assert(thread_has_expired_workqueue_quantum(thread, false));
3266 
3267 	struct workqueue *wq = proc_get_wqptr(proc);
3268 	assert(wq != NULL);
3269 
3270 	/*
3271 	 * For starters, we're just going to evaluate and see if we need to narrow
3272 	 * the pool and tell this thread to park if needed. In the future, we'll
3273 	 * evaluate and convey other workqueue state information like needing to
3274 	 * pump kevents, etc.
3275 	 */
3276 	uint64_t flags = 0;
3277 
3278 	workq_lock_spin(wq);
3279 
3280 	if (workq_thread_is_cooperative(uth)) {
3281 		if (!workq_cooperative_allowance(wq, qos, uth, false)) {
3282 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3283 		} else {
3284 			/* In the future, when we have kevent hookups for the cooperative
3285 			 * pool, we need fancier logic for what userspace should do. But
3286 			 * right now, only userspace thread requests exist - so we'll just
3287 			 * tell userspace to shuffle work items */
3288 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_SHUFFLE;
3289 		}
3290 	} else if (workq_thread_is_nonovercommit(uth)) {
3291 		if (!workq_constrained_allowance(wq, qos, uth, false)) {
3292 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3293 		}
3294 	}
3295 	workq_unlock(wq);
3296 
3297 	WQ_TRACE(TRACE_wq_quantum_expiry_reevaluate, flags, 0, 0, 0);
3298 
3299 	kevent_set_workq_quantum_expiry_user_tsd(proc, thread, flags);
3300 
3301 	/* We have conveyed to userspace about what it needs to do upon quantum
3302 	 * expiry, now rearm the workqueue quantum again */
3303 	thread_arm_workqueue_quantum(get_machthread(uth));
3304 }
3305 
3306 void
3307 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked)
3308 {
3309 	if (locked) {
3310 		workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE);
3311 	} else {
3312 		workq_schedule_immediate_thread_creation(wq);
3313 	}
3314 }
3315 
3316 static int
3317 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap,
3318     struct workqueue *wq)
3319 {
3320 	thread_t th = current_thread();
3321 	struct uthread *uth = get_bsdthread_info(th);
3322 	workq_threadreq_t kqr = uth->uu_kqr_bound;
3323 	workq_threadreq_param_t trp = { };
3324 	int nevents = uap->affinity, error;
3325 	user_addr_t eventlist = uap->item;
3326 
3327 	if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3328 	    (uth->uu_workq_flags & UT_WORKQ_DYING)) {
3329 		return EINVAL;
3330 	}
3331 
3332 	if (eventlist && nevents && kqr == NULL) {
3333 		return EINVAL;
3334 	}
3335 
3336 	/* reset signal mask on the workqueue thread to default state */
3337 	if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
3338 		proc_lock(p);
3339 		uth->uu_sigmask = ~workq_threadmask;
3340 		proc_unlock(p);
3341 	}
3342 
3343 	if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
3344 		/*
3345 		 * Ensure we store the threadreq param before unbinding
3346 		 * the kqr from this thread.
3347 		 */
3348 		trp = kqueue_threadreq_workloop_param(kqr);
3349 	}
3350 
3351 	/*
3352 	 * Freeze the base pri while we decide the fate of this thread.
3353 	 *
3354 	 * Either:
3355 	 * - we return to user and kevent_cleanup will have unfrozen the base pri,
3356 	 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
3357 	 */
3358 	thread_freeze_base_pri(th);
3359 
3360 	if (kqr) {
3361 		uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE;
3362 		if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
3363 			upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
3364 		} else {
3365 			upcall_flags |= WQ_FLAG_THREAD_KEVENT;
3366 		}
3367 		if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
3368 			upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
3369 		} else {
3370 			if (workq_thread_is_overcommit(uth)) {
3371 				upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
3372 			}
3373 			if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
3374 				upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
3375 			} else {
3376 				upcall_flags |= uth->uu_workq_pri.qos_req |
3377 				    WQ_FLAG_THREAD_PRIO_QOS;
3378 			}
3379 		}
3380 		error = pthread_functions->workq_handle_stack_events(p, th,
3381 		    get_task_map(proc_task(p)), uth->uu_workq_stackaddr,
3382 		    uth->uu_workq_thport, eventlist, nevents, upcall_flags);
3383 		if (error) {
3384 			assert(uth->uu_kqr_bound == kqr);
3385 			return error;
3386 		}
3387 
3388 		// pthread is supposed to pass KEVENT_FLAG_PARKING here
3389 		// which should cause the above call to either:
3390 		// - not return
3391 		// - return an error
3392 		// - return 0 and have unbound properly
3393 		assert(uth->uu_kqr_bound == NULL);
3394 	}
3395 
3396 	WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0);
3397 
3398 	thread_sched_call(th, NULL);
3399 	thread_will_park_or_terminate(th);
3400 #if CONFIG_WORKLOOP_DEBUG
3401 	UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, });
3402 #endif
3403 
3404 	workq_lock_spin(wq);
3405 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3406 	uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
3407 	workq_select_threadreq_or_park_and_unlock(p, wq, uth,
3408 	    WQ_SETUP_CLEAR_VOUCHER);
3409 	__builtin_unreachable();
3410 }
3411 
3412 /**
3413  * Multiplexed call to interact with the workqueue mechanism
3414  */
3415 int
3416 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
3417 {
3418 	int options = uap->options;
3419 	int arg2 = uap->affinity;
3420 	int arg3 = uap->prio;
3421 	struct workqueue *wq = proc_get_wqptr(p);
3422 	int error = 0;
3423 
3424 	if ((p->p_lflag & P_LREGISTER) == 0) {
3425 		return EINVAL;
3426 	}
3427 
3428 	switch (options) {
3429 	case WQOPS_QUEUE_NEWSPISUPP: {
3430 		/*
3431 		 * arg2 = offset of serialno into dispatch queue
3432 		 * arg3 = kevent support
3433 		 */
3434 		int offset = arg2;
3435 		if (arg3 & 0x01) {
3436 			// If we get here, then userspace has indicated support for kevent delivery.
3437 		}
3438 
3439 		p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
3440 		break;
3441 	}
3442 	case WQOPS_QUEUE_REQTHREADS: {
3443 		/*
3444 		 * arg2 = number of threads to start
3445 		 * arg3 = priority
3446 		 */
3447 		error = workq_reqthreads(p, arg2, arg3, false);
3448 		break;
3449 	}
3450 	/* For requesting threads for the cooperative pool */
3451 	case WQOPS_QUEUE_REQTHREADS2: {
3452 		/*
3453 		 * arg2 = number of threads to start
3454 		 * arg3 = priority
3455 		 */
3456 		error = workq_reqthreads(p, arg2, arg3, true);
3457 		break;
3458 	}
3459 	case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
3460 		/*
3461 		 * arg2 = priority for the manager thread
3462 		 *
3463 		 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
3464 		 * the low bits of the value contains a scheduling priority
3465 		 * instead of a QOS value
3466 		 */
3467 		pthread_priority_t pri = arg2;
3468 
3469 		if (wq == NULL) {
3470 			error = EINVAL;
3471 			break;
3472 		}
3473 
3474 		/*
3475 		 * Normalize the incoming priority so that it is ordered numerically.
3476 		 */
3477 		if (_pthread_priority_has_sched_pri(pri)) {
3478 			pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
3479 			    _PTHREAD_PRIORITY_SCHED_PRI_FLAG);
3480 		} else {
3481 			thread_qos_t qos = _pthread_priority_thread_qos(pri);
3482 			int relpri = _pthread_priority_relpri(pri);
3483 			if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
3484 			    qos == THREAD_QOS_UNSPECIFIED) {
3485 				error = EINVAL;
3486 				break;
3487 			}
3488 			pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3489 		}
3490 
3491 		/*
3492 		 * If userspace passes a scheduling priority, that wins over any QoS.
3493 		 * Userspace should takes care not to lower the priority this way.
3494 		 */
3495 		workq_lock_spin(wq);
3496 		if (wq->wq_event_manager_priority < (uint32_t)pri) {
3497 			wq->wq_event_manager_priority = (uint32_t)pri;
3498 		}
3499 		workq_unlock(wq);
3500 		break;
3501 	}
3502 	case WQOPS_THREAD_KEVENT_RETURN:
3503 	case WQOPS_THREAD_WORKLOOP_RETURN:
3504 	case WQOPS_THREAD_RETURN: {
3505 		error = workq_thread_return(p, uap, wq);
3506 		break;
3507 	}
3508 
3509 	case WQOPS_SHOULD_NARROW: {
3510 		/*
3511 		 * arg2 = priority to test
3512 		 * arg3 = unused
3513 		 */
3514 		thread_t th = current_thread();
3515 		struct uthread *uth = get_bsdthread_info(th);
3516 		if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3517 		    (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) {
3518 			error = EINVAL;
3519 			break;
3520 		}
3521 
3522 		thread_qos_t qos = _pthread_priority_thread_qos(arg2);
3523 		if (qos == THREAD_QOS_UNSPECIFIED) {
3524 			error = EINVAL;
3525 			break;
3526 		}
3527 		workq_lock_spin(wq);
3528 		bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false);
3529 		workq_unlock(wq);
3530 
3531 		*retval = should_narrow;
3532 		break;
3533 	}
3534 	case WQOPS_SETUP_DISPATCH: {
3535 		/*
3536 		 * item = pointer to workq_dispatch_config structure
3537 		 * arg2 = sizeof(item)
3538 		 */
3539 		struct workq_dispatch_config cfg;
3540 		bzero(&cfg, sizeof(cfg));
3541 
3542 		error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2));
3543 		if (error) {
3544 			break;
3545 		}
3546 
3547 		if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS ||
3548 		    cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) {
3549 			error = ENOTSUP;
3550 			break;
3551 		}
3552 
3553 		/* Load fields from version 1 */
3554 		p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs;
3555 
3556 		/* Load fields from version 2 */
3557 		if (cfg.wdc_version >= 2) {
3558 			p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs;
3559 		}
3560 
3561 		break;
3562 	}
3563 	default:
3564 		error = EINVAL;
3565 		break;
3566 	}
3567 
3568 	return error;
3569 }
3570 
3571 /*
3572  * We have no work to do, park ourselves on the idle list.
3573  *
3574  * Consumes the workqueue lock and does not return.
3575  */
3576 __attribute__((noreturn, noinline))
3577 static void
3578 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth,
3579     uint32_t setup_flags)
3580 {
3581 	assert(uth == current_uthread());
3582 	assert(uth->uu_kqr_bound == NULL);
3583 	workq_push_idle_thread(p, wq, uth, setup_flags); // may not return
3584 
3585 	workq_thread_reset_cpupercent(NULL, uth);
3586 
3587 #if CONFIG_PREADOPT_TG
3588 	/* Clear the preadoption thread group on the thread.
3589 	 *
3590 	 * Case 1:
3591 	 *		Creator thread which never picked up a thread request. We set a
3592 	 *		preadoption thread group on creator threads but if it never picked
3593 	 *		up a thread request and didn't go to userspace, then the thread will
3594 	 *		park with a preadoption thread group but no explicitly adopted
3595 	 *		voucher or work interval.
3596 	 *
3597 	 *		We drop the preadoption thread group here before proceeding to park.
3598 	 *		Note - we may get preempted when we drop the workq lock below.
3599 	 *
3600 	 * Case 2:
3601 	 *		Thread picked up a thread request and bound to it and returned back
3602 	 *		from userspace and is parking. At this point, preadoption thread
3603 	 *		group should be NULL since the thread has unbound from the thread
3604 	 *		request. So this operation should be a no-op.
3605 	 */
3606 	thread_set_preadopt_thread_group(get_machthread(uth), NULL);
3607 #endif
3608 
3609 	if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) &&
3610 	    !(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3611 		workq_unlock(wq);
3612 
3613 		/*
3614 		 * workq_push_idle_thread() will unset `has_stack`
3615 		 * if it wants us to free the stack before parking.
3616 		 */
3617 		if (!uth->uu_save.uus_workq_park_data.has_stack) {
3618 			pthread_functions->workq_markfree_threadstack(p,
3619 			    get_machthread(uth), get_task_map(proc_task(p)),
3620 			    uth->uu_workq_stackaddr);
3621 		}
3622 
3623 		/*
3624 		 * When we remove the voucher from the thread, we may lose our importance
3625 		 * causing us to get preempted, so we do this after putting the thread on
3626 		 * the idle list.  Then, when we get our importance back we'll be able to
3627 		 * use this thread from e.g. the kevent call out to deliver a boosting
3628 		 * message.
3629 		 *
3630 		 * Note that setting the voucher to NULL will not clear the preadoption
3631 		 * thread since this thread could have become the creator again and
3632 		 * perhaps acquired a preadoption thread group.
3633 		 */
3634 		__assert_only kern_return_t kr;
3635 		kr = thread_set_voucher_name(MACH_PORT_NULL);
3636 		assert(kr == KERN_SUCCESS);
3637 
3638 		workq_lock_spin(wq);
3639 		uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
3640 		setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER;
3641 	}
3642 
3643 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3644 
3645 	if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
3646 		/*
3647 		 * While we'd dropped the lock to unset our voucher, someone came
3648 		 * around and made us runnable.  But because we weren't waiting on the
3649 		 * event their thread_wakeup() was ineffectual.  To correct for that,
3650 		 * we just run the continuation ourselves.
3651 		 */
3652 		workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
3653 		__builtin_unreachable();
3654 	}
3655 
3656 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3657 		workq_unpark_for_death_and_unlock(p, wq, uth,
3658 		    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3659 		__builtin_unreachable();
3660 	}
3661 
3662 	/* Disarm the workqueue quantum since the thread is now idle */
3663 	thread_disarm_workqueue_quantum(get_machthread(uth));
3664 
3665 	thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3666 	assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3667 	workq_unlock(wq);
3668 	thread_block(workq_unpark_continue);
3669 	__builtin_unreachable();
3670 }
3671 
3672 static inline bool
3673 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth)
3674 {
3675 	/*
3676 	 * There's an event manager request and either:
3677 	 * - no event manager currently running
3678 	 * - we are re-using the event manager
3679 	 */
3680 	return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 ||
3681 	       (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER);
3682 }
3683 
3684 static uint32_t
3685 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
3686     struct uthread *uth, bool may_start_timer)
3687 {
3688 	assert(at_qos != WORKQ_THREAD_QOS_MANAGER);
3689 	uint32_t count = 0;
3690 
3691 	uint32_t max_count = wq->wq_constrained_threads_scheduled;
3692 	if (uth && workq_thread_is_nonovercommit(uth)) {
3693 		/*
3694 		 * don't count the current thread as scheduled
3695 		 */
3696 		assert(max_count > 0);
3697 		max_count--;
3698 	}
3699 	if (max_count >= wq_max_constrained_threads) {
3700 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
3701 		    wq->wq_constrained_threads_scheduled,
3702 		    wq_max_constrained_threads);
3703 		/*
3704 		 * we need 1 or more constrained threads to return to the kernel before
3705 		 * we can dispatch additional work
3706 		 */
3707 		return 0;
3708 	}
3709 	max_count -= wq_max_constrained_threads;
3710 
3711 	/*
3712 	 * Compute a metric for many how many threads are active.  We find the
3713 	 * highest priority request outstanding and then add up the number of active
3714 	 * threads in that and all higher-priority buckets.  We'll also add any
3715 	 * "busy" threads which are not currently active but blocked recently enough
3716 	 * that we can't be sure that they won't be unblocked soon and start
3717 	 * being active again.
3718 	 *
3719 	 * We'll then compare this metric to our max concurrency to decide whether
3720 	 * to add a new thread.
3721 	 */
3722 
3723 	uint32_t busycount, thactive_count;
3724 
3725 	thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
3726 	    at_qos, &busycount, NULL);
3727 
3728 	if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER &&
3729 	    at_qos <= uth->uu_workq_pri.qos_bucket) {
3730 		/*
3731 		 * Don't count this thread as currently active, but only if it's not
3732 		 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
3733 		 * managers.
3734 		 */
3735 		assert(thactive_count > 0);
3736 		thactive_count--;
3737 	}
3738 
3739 	count = wq_max_parallelism[_wq_bucket(at_qos)];
3740 	if (count > thactive_count + busycount) {
3741 		count -= thactive_count + busycount;
3742 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
3743 		    thactive_count, busycount);
3744 		return MIN(count, max_count);
3745 	} else {
3746 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
3747 		    thactive_count, busycount);
3748 	}
3749 
3750 	if (may_start_timer) {
3751 		/*
3752 		 * If this is called from the add timer, we won't have another timer
3753 		 * fire when the thread exits the "busy" state, so rearm the timer.
3754 		 */
3755 		workq_schedule_delayed_thread_creation(wq, 0);
3756 	}
3757 
3758 	return 0;
3759 }
3760 
3761 static bool
3762 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
3763     workq_threadreq_t req)
3764 {
3765 	if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
3766 		return workq_may_start_event_mgr_thread(wq, uth);
3767 	}
3768 	if (workq_threadreq_is_cooperative(req)) {
3769 		return workq_cooperative_allowance(wq, req->tr_qos, uth, true);
3770 	}
3771 	if (workq_threadreq_is_nonovercommit(req)) {
3772 		return workq_constrained_allowance(wq, req->tr_qos, uth, true);
3773 	}
3774 
3775 	return true;
3776 }
3777 
3778 /*
3779  * Called from the context of selecting thread requests for threads returning
3780  * from userspace or creator thread
3781  */
3782 static workq_threadreq_t
3783 workq_cooperative_queue_best_req(struct workqueue *wq, struct uthread *uth)
3784 {
3785 	workq_lock_held(wq);
3786 
3787 	/*
3788 	 * If the current thread is cooperative, we need to exclude it as part of
3789 	 * cooperative schedule count since this thread is looking for a new
3790 	 * request. Change in the schedule count for cooperative pool therefore
3791 	 * requires us to reeevaluate the next best request for it.
3792 	 */
3793 	if (uth && workq_thread_is_cooperative(uth)) {
3794 		_wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req);
3795 
3796 		(void) _wq_cooperative_queue_refresh_best_req_qos(wq);
3797 
3798 		_wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req);
3799 	} else {
3800 		/*
3801 		 * The old value that was already precomputed should be safe to use -
3802 		 * add an assert that asserts that the best req QoS doesn't change in
3803 		 * this case
3804 		 */
3805 		assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
3806 	}
3807 
3808 	thread_qos_t qos = wq->wq_cooperative_queue_best_req_qos;
3809 
3810 	/* There are no eligible requests in the cooperative pool */
3811 	if (qos == THREAD_QOS_UNSPECIFIED) {
3812 		return NULL;
3813 	}
3814 	assert(qos != WORKQ_THREAD_QOS_ABOVEUI);
3815 	assert(qos != WORKQ_THREAD_QOS_MANAGER);
3816 
3817 	uint8_t bucket = _wq_bucket(qos);
3818 	assert(!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket]));
3819 
3820 	return STAILQ_FIRST(&wq->wq_cooperative_queue[bucket]);
3821 }
3822 
3823 static workq_threadreq_t
3824 workq_threadreq_select_for_creator(struct workqueue *wq)
3825 {
3826 	workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
3827 	thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
3828 	uint8_t pri = 0;
3829 
3830 	/*
3831 	 * Compute the best priority request, and ignore the turnstile for now
3832 	 */
3833 
3834 	req_pri = priority_queue_max(&wq->wq_special_queue,
3835 	    struct workq_threadreq_s, tr_entry);
3836 	if (req_pri) {
3837 		pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
3838 		    &req_pri->tr_entry);
3839 	}
3840 
3841 	/*
3842 	 * Handle the manager thread request. The special queue might yield
3843 	 * a higher priority, but the manager always beats the QoS world.
3844 	 */
3845 
3846 	req_mgr = wq->wq_event_manager_threadreq;
3847 	if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) {
3848 		uint32_t mgr_pri = wq->wq_event_manager_priority;
3849 
3850 		if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3851 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
3852 		} else {
3853 			mgr_pri = thread_workq_pri_for_qos(
3854 				_pthread_priority_thread_qos(mgr_pri));
3855 		}
3856 
3857 		return mgr_pri >= pri ? req_mgr : req_pri;
3858 	}
3859 
3860 	/*
3861 	 * Compute the best QoS Request, and check whether it beats the "pri" one
3862 	 *
3863 	 * Start by comparing the overcommit and the cooperative pool
3864 	 */
3865 	req_qos = priority_queue_max(&wq->wq_overcommit_queue,
3866 	    struct workq_threadreq_s, tr_entry);
3867 	if (req_qos) {
3868 		qos = req_qos->tr_qos;
3869 	}
3870 
3871 	req_tmp = workq_cooperative_queue_best_req(wq, NULL);
3872 	if (req_tmp && qos <= req_tmp->tr_qos) {
3873 		/*
3874 		 * Cooperative TR is better between overcommit and cooperative.  Note
3875 		 * that if qos is same between overcommit and cooperative, we choose
3876 		 * cooperative.
3877 		 *
3878 		 * Pick cooperative pool if it passes the admissions check
3879 		 */
3880 		if (workq_cooperative_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3881 			req_qos = req_tmp;
3882 			qos = req_qos->tr_qos;
3883 		}
3884 	}
3885 
3886 	/*
3887 	 * Compare the best QoS so far - either from overcommit or from cooperative
3888 	 * pool - and compare it with the constrained pool
3889 	 */
3890 	req_tmp = priority_queue_max(&wq->wq_constrained_queue,
3891 	    struct workq_threadreq_s, tr_entry);
3892 
3893 	if (req_tmp && qos < req_tmp->tr_qos) {
3894 		/*
3895 		 * Constrained pool is best in QoS between overcommit, cooperative
3896 		 * and constrained. Now check how it fairs against the priority case
3897 		 */
3898 		if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
3899 			return req_pri;
3900 		}
3901 
3902 		if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3903 			/*
3904 			 * If the constrained thread request is the best one and passes
3905 			 * the admission check, pick it.
3906 			 */
3907 			return req_tmp;
3908 		}
3909 	}
3910 
3911 	/*
3912 	 * Compare the best of the QoS world with the priority
3913 	 */
3914 	if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
3915 		return req_pri;
3916 	}
3917 
3918 	if (req_qos) {
3919 		return req_qos;
3920 	}
3921 
3922 	/*
3923 	 * If we had no eligible request but we have a turnstile push,
3924 	 * it must be a non overcommit thread request that failed
3925 	 * the admission check.
3926 	 *
3927 	 * Just fake a BG thread request so that if the push stops the creator
3928 	 * priority just drops to 4.
3929 	 */
3930 	if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) {
3931 		static struct workq_threadreq_s workq_sync_push_fake_req = {
3932 			.tr_qos = THREAD_QOS_BACKGROUND,
3933 		};
3934 
3935 		return &workq_sync_push_fake_req;
3936 	}
3937 
3938 	return NULL;
3939 }
3940 
3941 /*
3942  * Returns true if this caused a change in the schedule counts of the
3943  * cooperative pool
3944  */
3945 static bool
3946 workq_adjust_cooperative_constrained_schedule_counts(struct workqueue *wq,
3947     struct uthread *uth, thread_qos_t old_thread_qos, workq_tr_flags_t tr_flags)
3948 {
3949 	workq_lock_held(wq);
3950 
3951 	/*
3952 	 * Row: thread type
3953 	 * Column: Request type
3954 	 *
3955 	 *					overcommit		non-overcommit		cooperative
3956 	 * overcommit			X				case 1				case 2
3957 	 * cooperative		case 3				case 4				case 5
3958 	 * non-overcommit	case 6					X				case 7
3959 	 *
3960 	 * Move the thread to the right bucket depending on what state it currently
3961 	 * has and what state the thread req it picks, is going to have.
3962 	 *
3963 	 * Note that the creator thread is an overcommit thread.
3964 	 */
3965 	thread_qos_t new_thread_qos = uth->uu_workq_pri.qos_req;
3966 
3967 	/*
3968 	 * Anytime a cooperative bucket's schedule count changes, we need to
3969 	 * potentially refresh the next best QoS for that pool when we determine
3970 	 * the next request for the creator
3971 	 */
3972 	bool cooperative_pool_sched_count_changed = false;
3973 
3974 	if (workq_thread_is_overcommit(uth)) {
3975 		if (workq_tr_is_nonovercommit(tr_flags)) {
3976 			// Case 1: thread is overcommit, req is non-overcommit
3977 			wq->wq_constrained_threads_scheduled++;
3978 		} else if (workq_tr_is_cooperative(tr_flags)) {
3979 			// Case 2: thread is overcommit, req is cooperative
3980 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3981 			cooperative_pool_sched_count_changed = true;
3982 		}
3983 	} else if (workq_thread_is_cooperative(uth)) {
3984 		if (workq_tr_is_overcommit(tr_flags)) {
3985 			// Case 3: thread is cooperative, req is overcommit
3986 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3987 		} else if (workq_tr_is_nonovercommit(tr_flags)) {
3988 			// Case 4: thread is cooperative, req is non-overcommit
3989 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3990 			wq->wq_constrained_threads_scheduled++;
3991 		} else {
3992 			// Case 5: thread is cooperative, req is also cooperative
3993 			assert(workq_tr_is_cooperative(tr_flags));
3994 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3995 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3996 		}
3997 		cooperative_pool_sched_count_changed = true;
3998 	} else {
3999 		if (workq_tr_is_overcommit(tr_flags)) {
4000 			// Case 6: Thread is non-overcommit, req is overcommit
4001 			wq->wq_constrained_threads_scheduled--;
4002 		} else if (workq_tr_is_cooperative(tr_flags)) {
4003 			// Case 7: Thread is non-overcommit, req is cooperative
4004 			wq->wq_constrained_threads_scheduled--;
4005 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
4006 			cooperative_pool_sched_count_changed = true;
4007 		}
4008 	}
4009 
4010 	return cooperative_pool_sched_count_changed;
4011 }
4012 
4013 static workq_threadreq_t
4014 workq_threadreq_select(struct workqueue *wq, struct uthread *uth)
4015 {
4016 	workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
4017 	uintptr_t proprietor;
4018 	thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
4019 	uint8_t pri = 0;
4020 
4021 	if (uth == wq->wq_creator) {
4022 		uth = NULL;
4023 	}
4024 
4025 	/*
4026 	 * Compute the best priority request (special or turnstile)
4027 	 */
4028 
4029 	pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile,
4030 	    &proprietor);
4031 	if (pri) {
4032 		struct kqworkloop *kqwl = (struct kqworkloop *)proprietor;
4033 		req_pri = &kqwl->kqwl_request;
4034 		if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) {
4035 			panic("Invalid thread request (%p) state %d",
4036 			    req_pri, req_pri->tr_state);
4037 		}
4038 	} else {
4039 		req_pri = NULL;
4040 	}
4041 
4042 	req_tmp = priority_queue_max(&wq->wq_special_queue,
4043 	    struct workq_threadreq_s, tr_entry);
4044 	if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue,
4045 	    &req_tmp->tr_entry)) {
4046 		req_pri = req_tmp;
4047 		pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
4048 		    &req_tmp->tr_entry);
4049 	}
4050 
4051 	/*
4052 	 * Handle the manager thread request. The special queue might yield
4053 	 * a higher priority, but the manager always beats the QoS world.
4054 	 */
4055 
4056 	req_mgr = wq->wq_event_manager_threadreq;
4057 	if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) {
4058 		uint32_t mgr_pri = wq->wq_event_manager_priority;
4059 
4060 		if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
4061 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
4062 		} else {
4063 			mgr_pri = thread_workq_pri_for_qos(
4064 				_pthread_priority_thread_qos(mgr_pri));
4065 		}
4066 
4067 		return mgr_pri >= pri ? req_mgr : req_pri;
4068 	}
4069 
4070 	/*
4071 	 * Compute the best QoS Request, and check whether it beats the "pri" one
4072 	 */
4073 
4074 	req_qos = priority_queue_max(&wq->wq_overcommit_queue,
4075 	    struct workq_threadreq_s, tr_entry);
4076 	if (req_qos) {
4077 		qos = req_qos->tr_qos;
4078 	}
4079 
4080 	req_tmp = workq_cooperative_queue_best_req(wq, uth);
4081 	if (req_tmp && qos <= req_tmp->tr_qos) {
4082 		/*
4083 		 * Cooperative TR is better between overcommit and cooperative.  Note
4084 		 * that if qos is same between overcommit and cooperative, we choose
4085 		 * cooperative.
4086 		 *
4087 		 * Pick cooperative pool if it passes the admissions check
4088 		 */
4089 		if (workq_cooperative_allowance(wq, req_tmp->tr_qos, uth, true)) {
4090 			req_qos = req_tmp;
4091 			qos = req_qos->tr_qos;
4092 		}
4093 	}
4094 
4095 	/*
4096 	 * Compare the best QoS so far - either from overcommit or from cooperative
4097 	 * pool - and compare it with the constrained pool
4098 	 */
4099 	req_tmp = priority_queue_max(&wq->wq_constrained_queue,
4100 	    struct workq_threadreq_s, tr_entry);
4101 
4102 	if (req_tmp && qos < req_tmp->tr_qos) {
4103 		/*
4104 		 * Constrained pool is best in QoS between overcommit, cooperative
4105 		 * and constrained. Now check how it fairs against the priority case
4106 		 */
4107 		if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
4108 			return req_pri;
4109 		}
4110 
4111 		if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) {
4112 			/*
4113 			 * If the constrained thread request is the best one and passes
4114 			 * the admission check, pick it.
4115 			 */
4116 			return req_tmp;
4117 		}
4118 	}
4119 
4120 	if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
4121 		return req_pri;
4122 	}
4123 
4124 	return req_qos;
4125 }
4126 
4127 /*
4128  * The creator is an anonymous thread that is counted as scheduled,
4129  * but otherwise without its scheduler callback set or tracked as active
4130  * that is used to make other threads.
4131  *
4132  * When more requests are added or an existing one is hurried along,
4133  * a creator is elected and setup, or the existing one overridden accordingly.
4134  *
4135  * While this creator is in flight, because no request has been dequeued,
4136  * already running threads have a chance at stealing thread requests avoiding
4137  * useless context switches, and the creator once scheduled may not find any
4138  * work to do and will then just park again.
4139  *
4140  * The creator serves the dual purpose of informing the scheduler of work that
4141  * hasn't be materialized as threads yet, and also as a natural pacing mechanism
4142  * for thread creation.
4143  *
4144  * By being anonymous (and not bound to anything) it means that thread requests
4145  * can be stolen from this creator by threads already on core yielding more
4146  * efficient scheduling and reduced context switches.
4147  */
4148 static void
4149 workq_schedule_creator(proc_t p, struct workqueue *wq,
4150     workq_kern_threadreq_flags_t flags)
4151 {
4152 	workq_threadreq_t req;
4153 	struct uthread *uth;
4154 	bool needs_wakeup;
4155 
4156 	workq_lock_held(wq);
4157 	assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0);
4158 
4159 again:
4160 	uth = wq->wq_creator;
4161 
4162 	if (!wq->wq_reqcount) {
4163 		/*
4164 		 * There is no thread request left.
4165 		 *
4166 		 * If there is a creator, leave everything in place, so that it cleans
4167 		 * up itself in workq_push_idle_thread().
4168 		 *
4169 		 * Else, make sure the turnstile state is reset to no inheritor.
4170 		 */
4171 		if (uth == NULL) {
4172 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4173 		}
4174 		return;
4175 	}
4176 
4177 	req = workq_threadreq_select_for_creator(wq);
4178 	if (req == NULL) {
4179 		/*
4180 		 * There isn't a thread request that passes the admission check.
4181 		 *
4182 		 * If there is a creator, do not touch anything, the creator will sort
4183 		 * it out when it runs.
4184 		 *
4185 		 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
4186 		 * code calls us if anything changes.
4187 		 */
4188 		if (uth == NULL) {
4189 			workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
4190 		}
4191 		return;
4192 	}
4193 
4194 
4195 	if (uth) {
4196 		/*
4197 		 * We need to maybe override the creator we already have
4198 		 */
4199 		if (workq_thread_needs_priority_change(req, uth)) {
4200 			WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4201 			    wq, 1, uthread_tid(uth), req->tr_qos);
4202 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4203 		}
4204 		assert(wq->wq_inheritor == get_machthread(uth));
4205 	} else if (wq->wq_thidlecount) {
4206 		/*
4207 		 * We need to unpark a creator thread
4208 		 */
4209 		wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT,
4210 		    &needs_wakeup);
4211 		/* Always reset the priorities on the newly chosen creator */
4212 		workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4213 		workq_turnstile_update_inheritor(wq, get_machthread(uth),
4214 		    TURNSTILE_INHERITOR_THREAD);
4215 		WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4216 		    wq, 2, uthread_tid(uth), req->tr_qos);
4217 		uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4218 		uth->uu_save.uus_workq_park_data.yields = 0;
4219 		if (needs_wakeup) {
4220 			workq_thread_wakeup(uth);
4221 		}
4222 	} else {
4223 		/*
4224 		 * We need to allocate a thread...
4225 		 */
4226 		if (__improbable(wq->wq_nthreads >= wq_max_threads)) {
4227 			/* out of threads, just go away */
4228 			flags = WORKQ_THREADREQ_NONE;
4229 		} else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) {
4230 			act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ);
4231 		} else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) {
4232 			/* This can drop the workqueue lock, and take it again */
4233 			workq_schedule_immediate_thread_creation(wq);
4234 		} else if (workq_add_new_idle_thread(p, wq)) {
4235 			goto again;
4236 		} else {
4237 			workq_schedule_delayed_thread_creation(wq, 0);
4238 		}
4239 
4240 		/*
4241 		 * If the current thread is the inheritor:
4242 		 *
4243 		 * If we set the AST, then the thread will stay the inheritor until
4244 		 * either the AST calls workq_kern_threadreq_redrive(), or it parks
4245 		 * and calls workq_push_idle_thread().
4246 		 *
4247 		 * Else, the responsibility of the thread creation is with a thread-call
4248 		 * and we need to clear the inheritor.
4249 		 */
4250 		if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 &&
4251 		    wq->wq_inheritor == current_thread()) {
4252 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4253 		}
4254 	}
4255 }
4256 
4257 /**
4258  * Same as workq_unpark_select_threadreq_or_park_and_unlock,
4259  * but do not allow early binds.
4260  *
4261  * Called with the base pri frozen, will unfreeze it.
4262  */
4263 __attribute__((noreturn, noinline))
4264 static void
4265 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4266     struct uthread *uth, uint32_t setup_flags)
4267 {
4268 	workq_threadreq_t req = NULL;
4269 	bool is_creator = (wq->wq_creator == uth);
4270 	bool schedule_creator = false;
4271 
4272 	if (__improbable(_wq_exiting(wq))) {
4273 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0);
4274 		goto park;
4275 	}
4276 
4277 	if (wq->wq_reqcount == 0) {
4278 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0);
4279 		goto park;
4280 	}
4281 
4282 	req = workq_threadreq_select(wq, uth);
4283 	if (__improbable(req == NULL)) {
4284 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0);
4285 		goto park;
4286 	}
4287 
4288 	struct uu_workq_policy old_pri = uth->uu_workq_pri;
4289 	uint8_t tr_flags = req->tr_flags;
4290 	struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req);
4291 
4292 	/*
4293 	 * Attempt to setup ourselves as the new thing to run, moving all priority
4294 	 * pushes to ourselves.
4295 	 *
4296 	 * If the current thread is the creator, then the fact that we are presently
4297 	 * running is proof that we'll do something useful, so keep going.
4298 	 *
4299 	 * For other cases, peek at the AST to know whether the scheduler wants
4300 	 * to preempt us, if yes, park instead, and move the thread request
4301 	 * turnstile back to the workqueue.
4302 	 */
4303 	if (req_ts) {
4304 		workq_perform_turnstile_operation_locked(wq, ^{
4305 			turnstile_update_inheritor(req_ts, get_machthread(uth),
4306 			TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD);
4307 			turnstile_update_inheritor_complete(req_ts,
4308 			TURNSTILE_INTERLOCK_HELD);
4309 		});
4310 	}
4311 
4312 	/* accounting changes of aggregate thscheduled_count and thactive which has
4313 	 * to be paired with the workq_thread_reset_pri below so that we have
4314 	 * uth->uu_workq_pri match with thactive.
4315 	 *
4316 	 * This is undone when the thread parks */
4317 	if (is_creator) {
4318 		WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0,
4319 		    uth->uu_save.uus_workq_park_data.yields);
4320 		wq->wq_creator = NULL;
4321 		_wq_thactive_inc(wq, req->tr_qos);
4322 		wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++;
4323 	} else if (old_pri.qos_bucket != req->tr_qos) {
4324 		_wq_thactive_move(wq, old_pri.qos_bucket, req->tr_qos);
4325 	}
4326 	workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4327 
4328 	/*
4329 	 * Make relevant accounting changes for pool specific counts.
4330 	 *
4331 	 * The schedule counts changing can affect what the next best request
4332 	 * for cooperative thread pool is if this request is dequeued.
4333 	 */
4334 	bool cooperative_sched_count_changed =
4335 	    workq_adjust_cooperative_constrained_schedule_counts(wq, uth,
4336 	    old_pri.qos_req, tr_flags);
4337 
4338 	if (workq_tr_is_overcommit(tr_flags)) {
4339 		workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
4340 	} else if (workq_tr_is_cooperative(tr_flags)) {
4341 		workq_thread_set_type(uth, UT_WORKQ_COOPERATIVE);
4342 	} else {
4343 		workq_thread_set_type(uth, 0);
4344 	}
4345 
4346 	if (__improbable(thread_unfreeze_base_pri(get_machthread(uth)) && !is_creator)) {
4347 		if (req_ts) {
4348 			workq_perform_turnstile_operation_locked(wq, ^{
4349 				turnstile_update_inheritor(req_ts, wq->wq_turnstile,
4350 				TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
4351 				turnstile_update_inheritor_complete(req_ts,
4352 				TURNSTILE_INTERLOCK_HELD);
4353 			});
4354 		}
4355 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0);
4356 		goto park_thawed;
4357 	}
4358 
4359 	/*
4360 	 * We passed all checks, dequeue the request, bind to it, and set it up
4361 	 * to return to user.
4362 	 */
4363 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4364 	    workq_trace_req_id(req), tr_flags, 0);
4365 	wq->wq_fulfilled++;
4366 	schedule_creator = workq_threadreq_dequeue(wq, req,
4367 	    cooperative_sched_count_changed);
4368 
4369 	workq_thread_reset_cpupercent(req, uth);
4370 
4371 	if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4372 		kqueue_threadreq_bind_prepost(p, req, uth);
4373 		req = NULL;
4374 	} else if (req->tr_count > 0) {
4375 		req = NULL;
4376 	}
4377 
4378 	if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4379 		uth->uu_workq_flags ^= UT_WORKQ_NEW;
4380 		setup_flags |= WQ_SETUP_FIRST_USE;
4381 	}
4382 
4383 	/* If one of the following is true, call workq_schedule_creator (which also
4384 	 * adjusts priority of existing creator):
4385 	 *
4386 	 *	  - We are the creator currently so the wq may need a new creator
4387 	 *	  - The request we're binding to is the highest priority one, existing
4388 	 *	  creator's priority might need to be adjusted to reflect the next
4389 	 *	  highest TR
4390 	 */
4391 	if (is_creator || schedule_creator) {
4392 		/* This can drop the workqueue lock, and take it again */
4393 		workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
4394 	}
4395 
4396 	workq_unlock(wq);
4397 
4398 	if (req) {
4399 		zfree(workq_zone_threadreq, req);
4400 	}
4401 
4402 	/*
4403 	 * Run Thread, Run!
4404 	 */
4405 	uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
4406 	if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
4407 		upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
4408 	} else if (workq_tr_is_overcommit(tr_flags)) {
4409 		upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
4410 	} else if (workq_tr_is_cooperative(tr_flags)) {
4411 		upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
4412 	}
4413 	if (tr_flags & WORKQ_TR_FLAG_KEVENT) {
4414 		upcall_flags |= WQ_FLAG_THREAD_KEVENT;
4415 		assert((upcall_flags & WQ_FLAG_THREAD_COOPERATIVE) == 0);
4416 	}
4417 
4418 	if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
4419 		upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
4420 	}
4421 	uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
4422 
4423 	if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4424 		kqueue_threadreq_bind_commit(p, get_machthread(uth));
4425 	} else {
4426 #if CONFIG_PREADOPT_TG
4427 		/*
4428 		 * The thread may have a preadopt thread group on it already because it
4429 		 * got tagged with it as a creator thread. So we need to make sure to
4430 		 * clear that since we don't have preadoption for anonymous thread
4431 		 * requests
4432 		 */
4433 		thread_set_preadopt_thread_group(get_machthread(uth), NULL);
4434 #endif
4435 	}
4436 
4437 	workq_setup_and_run(p, uth, setup_flags);
4438 	__builtin_unreachable();
4439 
4440 park:
4441 	thread_unfreeze_base_pri(get_machthread(uth));
4442 park_thawed:
4443 	workq_park_and_unlock(p, wq, uth, setup_flags);
4444 }
4445 
4446 /**
4447  * Runs a thread request on a thread
4448  *
4449  * - if thread is THREAD_NULL, will find a thread and run the request there.
4450  *   Otherwise, the thread must be the current thread.
4451  *
4452  * - if req is NULL, will find the highest priority request and run that.  If
4453  *   it is not NULL, it must be a threadreq object in state NEW.  If it can not
4454  *   be run immediately, it will be enqueued and moved to state QUEUED.
4455  *
4456  *   Either way, the thread request object serviced will be moved to state
4457  *   BINDING and attached to the uthread.
4458  *
4459  * Should be called with the workqueue lock held.  Will drop it.
4460  * Should be called with the base pri not frozen.
4461  */
4462 __attribute__((noreturn, noinline))
4463 static void
4464 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4465     struct uthread *uth, uint32_t setup_flags)
4466 {
4467 	if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) {
4468 		if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4469 			setup_flags |= WQ_SETUP_FIRST_USE;
4470 		}
4471 		uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND);
4472 		/*
4473 		 * This pointer is possibly freed and only used for tracing purposes.
4474 		 */
4475 		workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request;
4476 		workq_unlock(wq);
4477 		WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4478 		    VM_KERNEL_ADDRHIDE(req), 0, 0);
4479 		(void)req;
4480 
4481 		workq_setup_and_run(p, uth, setup_flags);
4482 		__builtin_unreachable();
4483 	}
4484 
4485 	thread_freeze_base_pri(get_machthread(uth));
4486 	workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
4487 }
4488 
4489 static bool
4490 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth)
4491 {
4492 	thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
4493 
4494 	if (qos >= THREAD_QOS_USER_INTERACTIVE) {
4495 		return false;
4496 	}
4497 
4498 	uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot;
4499 	if (wq->wq_fulfilled == snapshot) {
4500 		return false;
4501 	}
4502 
4503 	uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)];
4504 	if (wq->wq_fulfilled - snapshot > conc) {
4505 		/* we fulfilled more than NCPU requests since being dispatched */
4506 		WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1,
4507 		    wq->wq_fulfilled, snapshot);
4508 		return true;
4509 	}
4510 
4511 	for (uint8_t i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) {
4512 		cnt += wq->wq_thscheduled_count[i];
4513 	}
4514 	if (conc <= cnt) {
4515 		/* We fulfilled requests and have more than NCPU scheduled threads */
4516 		WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2,
4517 		    wq->wq_fulfilled, snapshot);
4518 		return true;
4519 	}
4520 
4521 	return false;
4522 }
4523 
4524 /**
4525  * parked thread wakes up
4526  */
4527 __attribute__((noreturn, noinline))
4528 static void
4529 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
4530 {
4531 	thread_t th = current_thread();
4532 	struct uthread *uth = get_bsdthread_info(th);
4533 	proc_t p = current_proc();
4534 	struct workqueue *wq = proc_get_wqptr_fast(p);
4535 
4536 	workq_lock_spin(wq);
4537 
4538 	if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
4539 		/*
4540 		 * If the number of threads we have out are able to keep up with the
4541 		 * demand, then we should avoid sending this creator thread to
4542 		 * userspace.
4543 		 */
4544 		uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4545 		uth->uu_save.uus_workq_park_data.yields++;
4546 		workq_unlock(wq);
4547 		thread_yield_with_continuation(workq_unpark_continue, NULL);
4548 		__builtin_unreachable();
4549 	}
4550 
4551 	if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
4552 		workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE);
4553 		__builtin_unreachable();
4554 	}
4555 
4556 	if (__probable(wr == THREAD_AWAKENED)) {
4557 		/*
4558 		 * We were set running, but for the purposes of dying.
4559 		 */
4560 		assert(uth->uu_workq_flags & UT_WORKQ_DYING);
4561 		assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
4562 	} else {
4563 		/*
4564 		 * workaround for <rdar://problem/38647347>,
4565 		 * in case we do hit userspace, make sure calling
4566 		 * workq_thread_terminate() does the right thing here,
4567 		 * and if we never call it, that workq_exit() will too because it sees
4568 		 * this thread on the runlist.
4569 		 */
4570 		assert(wr == THREAD_INTERRUPTED);
4571 		wq->wq_thdying_count++;
4572 		uth->uu_workq_flags |= UT_WORKQ_DYING;
4573 	}
4574 
4575 	workq_unpark_for_death_and_unlock(p, wq, uth,
4576 	    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
4577 	__builtin_unreachable();
4578 }
4579 
4580 __attribute__((noreturn, noinline))
4581 static void
4582 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags)
4583 {
4584 	thread_t th = get_machthread(uth);
4585 	vm_map_t vmap = get_task_map(proc_task(p));
4586 
4587 	if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
4588 		/*
4589 		 * For preemption reasons, we want to reset the voucher as late as
4590 		 * possible, so we do it in two places:
4591 		 *   - Just before parking (i.e. in workq_park_and_unlock())
4592 		 *   - Prior to doing the setup for the next workitem (i.e. here)
4593 		 *
4594 		 * Those two places are sufficient to ensure we always reset it before
4595 		 * it goes back out to user space, but be careful to not break that
4596 		 * guarantee.
4597 		 *
4598 		 * Note that setting the voucher to NULL will not clear the preadoption
4599 		 * thread group on this thread
4600 		 */
4601 		__assert_only kern_return_t kr;
4602 		kr = thread_set_voucher_name(MACH_PORT_NULL);
4603 		assert(kr == KERN_SUCCESS);
4604 	}
4605 
4606 	uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags;
4607 	if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
4608 		upcall_flags |= WQ_FLAG_THREAD_REUSE;
4609 	}
4610 
4611 	if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
4612 		/*
4613 		 * For threads that have an outside-of-QoS thread priority, indicate
4614 		 * to userspace that setting QoS should only affect the TSD and not
4615 		 * change QOS in the kernel.
4616 		 */
4617 		upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
4618 	} else {
4619 		/*
4620 		 * Put the QoS class value into the lower bits of the reuse_thread
4621 		 * register, this is where the thread priority used to be stored
4622 		 * anyway.
4623 		 */
4624 		upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
4625 		    WQ_FLAG_THREAD_PRIO_QOS;
4626 	}
4627 
4628 	if (uth->uu_workq_thport == MACH_PORT_NULL) {
4629 		/* convert_thread_to_port_pinned() consumes a reference */
4630 		thread_reference(th);
4631 		/* Convert to immovable/pinned thread port, but port is not pinned yet */
4632 		ipc_port_t port = convert_thread_to_port_pinned(th);
4633 		/* Atomically, pin and copy out the port */
4634 		uth->uu_workq_thport = ipc_port_copyout_send_pinned(port, get_task_ipcspace(proc_task(p)));
4635 	}
4636 
4637 	/* Thread has been set up to run, arm its next workqueue quantum or disarm
4638 	 * if it is no longer supporting that */
4639 	if (thread_supports_cooperative_workqueue(th)) {
4640 		thread_arm_workqueue_quantum(th);
4641 	} else {
4642 		thread_disarm_workqueue_quantum(th);
4643 	}
4644 
4645 	/*
4646 	 * Call out to pthread, this sets up the thread, pulls in kevent structs
4647 	 * onto the stack, sets up the thread state and then returns to userspace.
4648 	 */
4649 	WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START,
4650 	    proc_get_wqptr_fast(p), 0, 0, 0);
4651 
4652 	if (workq_thread_is_cooperative(uth)) {
4653 		thread_sched_call(th, NULL);
4654 	} else {
4655 		thread_sched_call(th, workq_sched_callback);
4656 	}
4657 
4658 	pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
4659 	    uth->uu_workq_thport, 0, setup_flags, upcall_flags);
4660 
4661 	__builtin_unreachable();
4662 }
4663 
4664 #pragma mark misc
4665 
4666 int
4667 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
4668 {
4669 	struct workqueue *wq = proc_get_wqptr(p);
4670 	int error = 0;
4671 	int     activecount;
4672 
4673 	if (wq == NULL) {
4674 		return EINVAL;
4675 	}
4676 
4677 	/*
4678 	 * This is sometimes called from interrupt context by the kperf sampler.
4679 	 * In that case, it's not safe to spin trying to take the lock since we
4680 	 * might already hold it.  So, we just try-lock it and error out if it's
4681 	 * already held.  Since this is just a debugging aid, and all our callers
4682 	 * are able to handle an error, that's fine.
4683 	 */
4684 	bool locked = workq_lock_try(wq);
4685 	if (!locked) {
4686 		return EBUSY;
4687 	}
4688 
4689 	wq_thactive_t act = _wq_thactive(wq);
4690 	activecount = _wq_thactive_aggregate_downto_qos(wq, act,
4691 	    WORKQ_THREAD_QOS_MIN, NULL, NULL);
4692 	if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) {
4693 		activecount++;
4694 	}
4695 	pwqinfo->pwq_nthreads = wq->wq_nthreads;
4696 	pwqinfo->pwq_runthreads = activecount;
4697 	pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
4698 	pwqinfo->pwq_state = 0;
4699 
4700 	if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4701 		pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4702 	}
4703 
4704 	if (wq->wq_nthreads >= wq_max_threads) {
4705 		pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4706 	}
4707 
4708 	workq_unlock(wq);
4709 	return error;
4710 }
4711 
4712 boolean_t
4713 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total,
4714     boolean_t *exceeded_constrained)
4715 {
4716 	proc_t p = v;
4717 	struct proc_workqueueinfo pwqinfo;
4718 	int err;
4719 
4720 	assert(p != NULL);
4721 	assert(exceeded_total != NULL);
4722 	assert(exceeded_constrained != NULL);
4723 
4724 	err = fill_procworkqueue(p, &pwqinfo);
4725 	if (err) {
4726 		return FALSE;
4727 	}
4728 	if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) {
4729 		return FALSE;
4730 	}
4731 
4732 	*exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT);
4733 	*exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT);
4734 
4735 	return TRUE;
4736 }
4737 
4738 uint32_t
4739 workqueue_get_pwq_state_kdp(void * v)
4740 {
4741 	static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) ==
4742 	    kTaskWqExceededConstrainedThreadLimit);
4743 	static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) ==
4744 	    kTaskWqExceededTotalThreadLimit);
4745 	static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable);
4746 	static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT |
4747 	    WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7);
4748 
4749 	if (v == NULL) {
4750 		return 0;
4751 	}
4752 
4753 	proc_t p = v;
4754 	struct workqueue *wq = proc_get_wqptr(p);
4755 
4756 	if (wq == NULL || workq_lock_is_acquired_kdp(wq)) {
4757 		return 0;
4758 	}
4759 
4760 	uint32_t pwq_state = WQ_FLAGS_AVAILABLE;
4761 
4762 	if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4763 		pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4764 	}
4765 
4766 	if (wq->wq_nthreads >= wq_max_threads) {
4767 		pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4768 	}
4769 
4770 	return pwq_state;
4771 }
4772 
4773 void
4774 workq_init(void)
4775 {
4776 	clock_interval_to_absolutetime_interval(wq_stalled_window.usecs,
4777 	    NSEC_PER_USEC, &wq_stalled_window.abstime);
4778 	clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs,
4779 	    NSEC_PER_USEC, &wq_reduce_pool_window.abstime);
4780 	clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs,
4781 	    NSEC_PER_USEC, &wq_max_timer_interval.abstime);
4782 
4783 	thread_deallocate_daemon_register_queue(&workq_deallocate_queue,
4784 	    workq_deallocate_queue_invoke);
4785 }
4786