xref: /xnu-11215/osfmk/kern/mpsc_queue.c (revision 8d741a5d)
1 /*
2  * Copyright (c) 2018 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 
29 #include <machine/machine_cpu.h>
30 #include <kern/locks.h>
31 #include <kern/mpsc_queue.h>
32 #include <kern/queue.h>
33 #include <kern/thread.h>
34 
35 #pragma mark Single Consumer calls
36 
37 __attribute__((noinline))
38 static mpsc_queue_chain_t
_mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain * _Atomic * ptr)39 _mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain *_Atomic *ptr)
40 {
41 	return hw_wait_while_equals_long(ptr, NULL);
42 }
43 
44 void
mpsc_queue_restore_batch(mpsc_queue_head_t q,mpsc_queue_chain_t first,mpsc_queue_chain_t last)45 mpsc_queue_restore_batch(mpsc_queue_head_t q, mpsc_queue_chain_t first,
46     mpsc_queue_chain_t last)
47 {
48 	mpsc_queue_chain_t head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
49 
50 	os_atomic_store(&last->mpqc_next, head, relaxed);
51 
52 	if (head == NULL &&
53 	    !os_atomic_cmpxchg(&q->mpqh_tail, &q->mpqh_head, last, release)) {
54 		head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
55 		if (__improbable(head == NULL)) {
56 			head = _mpsc_queue_wait_for_enqueuer(&q->mpqh_head.mpqc_next);
57 		}
58 		os_atomic_store(&last->mpqc_next, head, relaxed);
59 	}
60 
61 	os_atomic_store(&q->mpqh_head.mpqc_next, first, relaxed);
62 }
63 
64 mpsc_queue_chain_t
mpsc_queue_dequeue_batch(mpsc_queue_head_t q,mpsc_queue_chain_t * tail_out,os_atomic_dependency_t dependency)65 mpsc_queue_dequeue_batch(mpsc_queue_head_t q, mpsc_queue_chain_t *tail_out,
66     os_atomic_dependency_t dependency)
67 {
68 	mpsc_queue_chain_t head, tail;
69 
70 	q = os_atomic_inject_dependency(q, dependency);
71 
72 	tail = os_atomic_load(&q->mpqh_tail, relaxed);
73 	if (__improbable(tail == &q->mpqh_head)) {
74 		*tail_out = NULL;
75 		return NULL;
76 	}
77 
78 	head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
79 	if (__improbable(head == NULL)) {
80 		head = _mpsc_queue_wait_for_enqueuer(&q->mpqh_head.mpqc_next);
81 	}
82 	os_atomic_store(&q->mpqh_head.mpqc_next, NULL, relaxed);
83 	/*
84 	 * 22708742: set tail to &q->mpqh_head with release, so that NULL write
85 	 * to head above doesn't clobber the head set by concurrent enqueuer
86 	 *
87 	 * The other half of the seq_cst is required to pair with any enqueuer that
88 	 * contributed to an element in this list (pairs with the release fence in
89 	 * __mpsc_queue_append_update_tail().
90 	 *
91 	 * Making this seq_cst instead of acq_rel makes mpsc_queue_append*()
92 	 * visibility transitive (when items hop from one queue to the next)
93 	 * which is expected by clients implicitly.
94 	 *
95 	 * Note that this is the same number of fences that a traditional lock
96 	 * would have, but as a once-per-batch cost.
97 	 */
98 	*tail_out = os_atomic_xchg(&q->mpqh_tail, &q->mpqh_head, seq_cst);
99 
100 	return head;
101 }
102 
103 mpsc_queue_chain_t
mpsc_queue_batch_next(mpsc_queue_chain_t cur,mpsc_queue_chain_t tail)104 mpsc_queue_batch_next(mpsc_queue_chain_t cur, mpsc_queue_chain_t tail)
105 {
106 	mpsc_queue_chain_t elm = NULL;
107 	if (cur == tail || cur == NULL) {
108 		return elm;
109 	}
110 
111 	elm = os_atomic_load(&cur->mpqc_next, relaxed);
112 	if (__improbable(elm == NULL)) {
113 		elm = _mpsc_queue_wait_for_enqueuer(&cur->mpqc_next);
114 	}
115 	return elm;
116 }
117 
118 #pragma mark "GCD"-like facilities
119 
120 static void _mpsc_daemon_queue_drain(mpsc_daemon_queue_t, thread_t);
121 static void _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t, mpsc_queue_chain_t);
122 
123 /* thread based queues */
124 
125 static void
_mpsc_daemon_queue_init(mpsc_daemon_queue_t dq,mpsc_daemon_init_options_t flags)126 _mpsc_daemon_queue_init(mpsc_daemon_queue_t dq, mpsc_daemon_init_options_t flags)
127 {
128 	if (flags & MPSC_DAEMON_INIT_INACTIVE) {
129 		os_atomic_init(&dq->mpd_state, MPSC_QUEUE_STATE_INACTIVE);
130 	}
131 }
132 
133 static void
_mpsc_queue_thread_continue(void * param,wait_result_t wr __unused)134 _mpsc_queue_thread_continue(void *param, wait_result_t wr __unused)
135 {
136 	mpsc_daemon_queue_t dq = param;
137 	mpsc_daemon_queue_kind_t kind = dq->mpd_kind;
138 	thread_t self = dq->mpd_thread;
139 
140 	__builtin_assume(self != THREAD_NULL);
141 
142 	if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) {
143 		self->options |= TH_OPT_SYSTEM_CRITICAL;
144 	}
145 
146 	assert(dq->mpd_thread == current_thread());
147 	_mpsc_daemon_queue_drain(dq, self);
148 
149 	if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) {
150 		self->options &= ~TH_OPT_SYSTEM_CRITICAL;
151 	}
152 
153 	thread_block_parameter(_mpsc_queue_thread_continue, dq);
154 }
155 
156 static void
_mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq)157 _mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq)
158 {
159 	thread_wakeup_thread((event_t)dq, dq->mpd_thread);
160 }
161 
162 static kern_return_t
_mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke,int pri,const char * name,mpsc_daemon_queue_kind_t kind,mpsc_daemon_init_options_t flags)163 _mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
164     mpsc_daemon_invoke_fn_t invoke, int pri, const char *name,
165     mpsc_daemon_queue_kind_t kind, mpsc_daemon_init_options_t flags)
166 {
167 	kern_return_t kr;
168 
169 	*dq = (struct mpsc_daemon_queue){
170 		.mpd_kind   = kind,
171 		.mpd_invoke = invoke,
172 		.mpd_queue  = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
173 		.mpd_chain  = { MPSC_QUEUE_NOTQUEUED_MARKER },
174 	};
175 	_mpsc_daemon_queue_init(dq, flags);
176 
177 	kr = kernel_thread_create(_mpsc_queue_thread_continue, dq, pri,
178 	    &dq->mpd_thread);
179 	if (kr == KERN_SUCCESS) {
180 		thread_set_thread_name(dq->mpd_thread, name);
181 		thread_start_in_assert_wait(dq->mpd_thread,
182 		    assert_wait_queue(dq), CAST_EVENT64_T(dq),
183 		    THREAD_UNINT);
184 		thread_deallocate(dq->mpd_thread);
185 	}
186 	return kr;
187 }
188 
189 kern_return_t
mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke,int pri,const char * name,mpsc_daemon_init_options_t flags)190 mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
191     mpsc_daemon_invoke_fn_t invoke, int pri, const char *name,
192     mpsc_daemon_init_options_t flags)
193 {
194 	return _mpsc_daemon_queue_init_with_thread(dq, invoke, pri, name,
195 	           MPSC_QUEUE_KIND_THREAD, flags);
196 }
197 
198 /* thread-call based queues */
199 
200 static void
_mpsc_queue_thread_call_drain(thread_call_param_t arg0,thread_call_param_t arg1 __unused)201 _mpsc_queue_thread_call_drain(thread_call_param_t arg0,
202     thread_call_param_t arg1 __unused)
203 {
204 	_mpsc_daemon_queue_drain((mpsc_daemon_queue_t)arg0, NULL);
205 }
206 
207 static void
_mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq)208 _mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq)
209 {
210 	thread_call_enter(dq->mpd_call);
211 }
212 
213 void
mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke,thread_call_priority_t pri,mpsc_daemon_init_options_t flags)214 mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq,
215     mpsc_daemon_invoke_fn_t invoke, thread_call_priority_t pri,
216     mpsc_daemon_init_options_t flags)
217 {
218 	*dq = (struct mpsc_daemon_queue){
219 		.mpd_kind   = MPSC_QUEUE_KIND_THREAD_CALL,
220 		.mpd_invoke = invoke,
221 		.mpd_queue  = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
222 		.mpd_chain  = { MPSC_QUEUE_NOTQUEUED_MARKER },
223 	};
224 	_mpsc_daemon_queue_init(dq, flags);
225 	dq->mpd_call = thread_call_allocate_with_options(
226 		_mpsc_queue_thread_call_drain, dq, pri, THREAD_CALL_OPTIONS_ONCE);
227 }
228 
229 /* nested queues */
230 
231 void
mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm,__unused mpsc_daemon_queue_t tq)232 mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm,
233     __unused mpsc_daemon_queue_t tq)
234 {
235 	mpsc_daemon_queue_t dq;
236 	dq = mpsc_queue_element(elm, struct mpsc_daemon_queue, mpd_chain);
237 	_mpsc_daemon_queue_drain(dq, NULL);
238 }
239 
240 static void
_mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq)241 _mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq)
242 {
243 	_mpsc_daemon_queue_enqueue(dq->mpd_target, &dq->mpd_chain);
244 }
245 
246 void
mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke,mpsc_daemon_queue_t target,mpsc_daemon_init_options_t flags)247 mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq,
248     mpsc_daemon_invoke_fn_t invoke, mpsc_daemon_queue_t target,
249     mpsc_daemon_init_options_t flags)
250 {
251 	*dq = (struct mpsc_daemon_queue){
252 		.mpd_kind   = MPSC_QUEUE_KIND_NESTED,
253 		.mpd_invoke = invoke,
254 		.mpd_target = target,
255 		.mpd_queue  = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
256 		.mpd_chain  = { MPSC_QUEUE_NOTQUEUED_MARKER },
257 	};
258 	_mpsc_daemon_queue_init(dq, flags);
259 }
260 
261 /* enqueue, drain & cancelation */
262 
263 static void
_mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq,thread_t self)264 _mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq, thread_t self)
265 {
266 	mpsc_daemon_invoke_fn_t invoke = dq->mpd_invoke;
267 	mpsc_queue_chain_t head, cur, tail;
268 	mpsc_daemon_queue_state_t st;
269 
270 again:
271 	/*
272 	 * Most of the time we're woken up because we're dirty,
273 	 * This atomic xor sets DRAINING and clears WAKEUP in a single atomic
274 	 * in that case.
275 	 *
276 	 * However, if we're woken up for cancelation, the state may be reduced to
277 	 * the CANCELED bit set only, and then the xor will actually set WAKEUP.
278 	 * We need to correct this and clear it back to avoid looping below.
279 	 * This is safe to do as no one is allowed to enqueue more work after
280 	 * cancelation has happened.
281 	 *
282 	 * We use `st` as a dependency token to pair with the release fence in
283 	 * _mpsc_daemon_queue_enqueue() which gives us the guarantee that the update
284 	 * to the tail of the MPSC queue that made it non empty is visible to us.
285 	 */
286 	st = os_atomic_xor(&dq->mpd_state,
287 	    MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP, dependency);
288 	assert(st & MPSC_QUEUE_STATE_DRAINING);
289 	if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) {
290 		assert(st & MPSC_QUEUE_STATE_CANCELED);
291 		os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, relaxed);
292 	}
293 
294 	os_atomic_dependency_t dep = os_atomic_make_dependency((uintptr_t)st);
295 	if ((head = mpsc_queue_dequeue_batch(&dq->mpd_queue, &tail, dep))) {
296 		do {
297 			mpsc_queue_batch_foreach_safe(cur, head, tail) {
298 				os_atomic_store(&cur->mpqc_next,
299 				    MPSC_QUEUE_NOTQUEUED_MARKER, relaxed);
300 				invoke(cur, dq);
301 			}
302 		} while ((head = mpsc_queue_dequeue_batch(&dq->mpd_queue, &tail, dep)));
303 
304 		if (dq->mpd_options & MPSC_QUEUE_OPTION_BATCH) {
305 			invoke(MPSC_QUEUE_BATCH_END, dq);
306 		}
307 	}
308 
309 	if (self) {
310 		assert_wait((event_t)dq, THREAD_UNINT);
311 	}
312 
313 	/*
314 	 * Unlike GCD no fence is necessary here: there is no concept similar
315 	 * to "dispatch_sync()" that would require changes this thread made to be
316 	 * visible to other threads as part of the mpsc_daemon_queue machinery.
317 	 *
318 	 * Making updates that happened on the daemon queue visible to other threads
319 	 * is the responsibility of the client.
320 	 */
321 	st = os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_DRAINING, relaxed);
322 
323 	/*
324 	 * A wakeup has happened while we were draining,
325 	 * which means that the queue did an [ empty -> non empty ]
326 	 * transition during our drain.
327 	 *
328 	 * Chances are we already observed and drained everything,
329 	 * but we need to be absolutely sure, so start a drain again
330 	 * as the enqueuer observed the DRAINING bit and has skipped calling
331 	 * _mpsc_daemon_queue_wakeup().
332 	 */
333 	if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) {
334 		if (self) {
335 			clear_wait(self, THREAD_AWAKENED);
336 		}
337 		goto again;
338 	}
339 
340 	/* dereferencing `dq` past this point is unsafe */
341 
342 	if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
343 		thread_wakeup(&dq->mpd_state);
344 		if (self) {
345 			clear_wait(self, THREAD_AWAKENED);
346 			thread_terminate_self();
347 			__builtin_unreachable();
348 		}
349 	}
350 }
351 
352 static void
_mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq)353 _mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq)
354 {
355 	switch (dq->mpd_kind) {
356 	case MPSC_QUEUE_KIND_NESTED:
357 		_mpsc_daemon_queue_nested_wakeup(dq);
358 		break;
359 	case MPSC_QUEUE_KIND_THREAD:
360 	case MPSC_QUEUE_KIND_THREAD_CRITICAL:
361 		_mpsc_queue_thread_wakeup(dq);
362 		break;
363 	case MPSC_QUEUE_KIND_THREAD_CALL:
364 		_mpsc_queue_thread_call_wakeup(dq);
365 		break;
366 	default:
367 		panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind);
368 	}
369 }
370 
371 static void
_mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq,mpsc_queue_chain_t elm)372 _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm)
373 {
374 	mpsc_daemon_queue_state_t st;
375 
376 	if (mpsc_queue_append(&dq->mpd_queue, elm)) {
377 		/*
378 		 * Pairs with the acquire fence in _mpsc_daemon_queue_drain().
379 		 */
380 		st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, release);
381 		if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
382 			panic("mpsc_queue[%p]: use after cancelation", dq);
383 		}
384 
385 		if ((st & (MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP |
386 		    MPSC_QUEUE_STATE_INACTIVE)) == 0) {
387 			_mpsc_daemon_queue_wakeup(dq);
388 		}
389 	}
390 }
391 
392 void
mpsc_daemon_enqueue(mpsc_daemon_queue_t dq,mpsc_queue_chain_t elm,mpsc_queue_options_t options)393 mpsc_daemon_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm,
394     mpsc_queue_options_t options)
395 {
396 	if (options & MPSC_QUEUE_DISABLE_PREEMPTION) {
397 		disable_preemption();
398 	}
399 
400 	_mpsc_daemon_queue_enqueue(dq, elm);
401 
402 	if (options & MPSC_QUEUE_DISABLE_PREEMPTION) {
403 		enable_preemption();
404 	}
405 }
406 
407 void
mpsc_daemon_queue_activate(mpsc_daemon_queue_t dq)408 mpsc_daemon_queue_activate(mpsc_daemon_queue_t dq)
409 {
410 	mpsc_daemon_queue_state_t st;
411 
412 	st = os_atomic_andnot_orig(&dq->mpd_state,
413 	    MPSC_QUEUE_STATE_INACTIVE, relaxed);
414 	if ((st & MPSC_QUEUE_STATE_WAKEUP) && (st & MPSC_QUEUE_STATE_INACTIVE)) {
415 		_mpsc_daemon_queue_wakeup(dq);
416 	}
417 }
418 
419 void
mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq)420 mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq)
421 {
422 	mpsc_daemon_queue_state_t st;
423 
424 	assert_wait((event_t)&dq->mpd_state, THREAD_UNINT);
425 
426 	st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_CANCELED, relaxed);
427 	if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
428 		panic("mpsc_queue[%p]: cancelled twice (%x)", dq, st);
429 	}
430 	if (__improbable(st & MPSC_QUEUE_STATE_INACTIVE)) {
431 		panic("mpsc_queue[%p]: queue is inactive (%x)", dq, st);
432 	}
433 
434 	if (dq->mpd_kind == MPSC_QUEUE_KIND_NESTED && st == 0) {
435 		clear_wait(current_thread(), THREAD_AWAKENED);
436 	} else {
437 		disable_preemption();
438 		_mpsc_daemon_queue_wakeup(dq);
439 		enable_preemption();
440 		thread_block(THREAD_CONTINUE_NULL);
441 	}
442 
443 	switch (dq->mpd_kind) {
444 	case MPSC_QUEUE_KIND_NESTED:
445 		dq->mpd_target = NULL;
446 		break;
447 	case MPSC_QUEUE_KIND_THREAD:
448 	case MPSC_QUEUE_KIND_THREAD_CRITICAL:
449 		dq->mpd_thread = NULL;
450 		break;
451 	case MPSC_QUEUE_KIND_THREAD_CALL:
452 		thread_call_cancel_wait(dq->mpd_call);
453 		thread_call_free(dq->mpd_call);
454 		dq->mpd_call = NULL;
455 		break;
456 	default:
457 		panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind);
458 	}
459 	dq->mpd_kind = MPSC_QUEUE_KIND_UNKNOWN;
460 }
461 
462 #pragma mark deferred deallocation daemon
463 
464 static struct mpsc_daemon_queue thread_deferred_deallocation_queue;
465 
466 void
thread_deallocate_daemon_init(void)467 thread_deallocate_daemon_init(void)
468 {
469 	kern_return_t kr;
470 
471 	kr = _mpsc_daemon_queue_init_with_thread(&thread_deferred_deallocation_queue,
472 	    mpsc_daemon_queue_nested_invoke, MINPRI_KERNEL,
473 	    "daemon.deferred-deallocation", MPSC_QUEUE_KIND_THREAD_CRITICAL,
474 	    MPSC_DAEMON_INIT_NONE);
475 	if (kr != KERN_SUCCESS) {
476 		panic("thread_deallocate_daemon_init: creating daemon failed (%d)", kr);
477 	}
478 }
479 
480 void
thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke)481 thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq,
482     mpsc_daemon_invoke_fn_t invoke)
483 {
484 	mpsc_daemon_queue_init_with_target(dq, invoke,
485 	    &thread_deferred_deallocation_queue, MPSC_DAEMON_INIT_NONE);
486 }
487