xref: /xnu-11215/osfmk/ipc/ipc_mqueue.c (revision 8d741a5d)
1 /*
2  * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /*
29  * @OSF_FREE_COPYRIGHT@
30  */
31 /*
32  * Mach Operating System
33  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34  * All Rights Reserved.
35  *
36  * Permission to use, copy, modify and distribute this software and its
37  * documentation is hereby granted, provided that both the copyright
38  * notice and this permission notice appear in all copies of the
39  * software, derivative works or modified versions, and any portions
40  * thereof, and that both notices appear in supporting documentation.
41  *
42  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45  *
46  * Carnegie Mellon requests users of this software to return to
47  *
48  *  Software Distribution Coordinator  or  [email protected]
49  *  School of Computer Science
50  *  Carnegie Mellon University
51  *  Pittsburgh PA 15213-3890
52  *
53  * any improvements or extensions that they make and grant Carnegie Mellon
54  * the rights to redistribute these changes.
55  */
56 /*
57  */
58 /*
59  *	File:	ipc/ipc_mqueue.c
60  *	Author:	Rich Draves
61  *	Date:	1989
62  *
63  *	Functions to manipulate IPC message queues.
64  */
65 /*
66  * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67  * support for mandatory and extensible security protections.  This notice
68  * is included in support of clause 2.2 (b) of the Apple Public License,
69  * Version 2.0.
70  */
71 
72 
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76 
77 #include <kern/assert.h>
78 #include <kern/counter.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86 
87 #include <ipc/port.h>
88 #include <ipc/ipc_mqueue.h>
89 #include <ipc/ipc_kmsg.h>
90 #include <ipc/ipc_right.h>
91 #include <ipc/ipc_port.h>
92 #include <ipc/ipc_pset.h>
93 #include <ipc/ipc_space.h>
94 
95 #if MACH_FLIPC
96 #include <ipc/flipc.h>
97 #endif
98 
99 #ifdef __LP64__
100 #include <vm/vm_map.h>
101 #endif
102 
103 #include <sys/event.h>
104 
105 extern char     *proc_name_address(void *p);
106 
107 int ipc_mqueue_full;            /* address is event for queue space */
108 int ipc_mqueue_rcv;             /* address is event for message arrival */
109 
110 /* forward declarations */
111 static void ipc_mqueue_receive_results(wait_result_t result);
112 
113 #if MACH_FLIPC
114 static void ipc_mqueue_peek_on_thread_locked(
115 	ipc_mqueue_t        port_mq,
116 	mach_msg_option64_t option,
117 	thread_t            thread);
118 #endif /* MACH_FLIPC */
119 
120 /* Deliver message to message queue or waiting receiver */
121 static void ipc_mqueue_post(
122 	ipc_mqueue_t            mqueue,
123 	ipc_kmsg_t              kmsg,
124 	mach_msg_option64_t     option);
125 
126 /*
127  *	Routine:	ipc_mqueue_init
128  *	Purpose:
129  *		Initialize a newly-allocated message queue.
130  */
131 void
ipc_mqueue_init(ipc_mqueue_t mqueue)132 ipc_mqueue_init(
133 	ipc_mqueue_t            mqueue)
134 {
135 	ipc_kmsg_queue_init(&mqueue->imq_messages);
136 	mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
137 	klist_init(&mqueue->imq_klist);
138 }
139 
140 /*
141  *	Routine:	ipc_mqueue_msg_too_large
142  *	Purpose:
143  *		Return true if kmsg is too large to be received:
144  *
145  *      If MACH64_RCV_LINEAR_VECTOR:
146  *          - combined message buffer is not large enough
147  *            to fit both the message (plus trailer) and
148  *            auxiliary data.
149  *      Otherwise:
150  *          - message buffer is not large enough
151  *          - auxiliary buffer is not large enough:
152  *	      (1) kmsg is a vector with aux, but user expects
153  *                a scalar kmsg (ith_max_asize is 0)
154  *            (2) kmsg is a vector with aux, but user aux
155  *                buffer is not large enough.
156  */
157 static bool
ipc_mqueue_msg_too_large(mach_msg_size_t msg_size,mach_msg_size_t trailer_size,mach_msg_size_t aux_size,mach_msg_option64_t options,mach_msg_recv_bufs_t * recv_bufs)158 ipc_mqueue_msg_too_large(
159 	mach_msg_size_t         msg_size,
160 	mach_msg_size_t         trailer_size,
161 	mach_msg_size_t         aux_size,
162 	mach_msg_option64_t     options,
163 	mach_msg_recv_bufs_t   *recv_bufs)
164 {
165 	mach_msg_size_t max_msg_size = recv_bufs->recv_msg_size;
166 	mach_msg_size_t max_aux_size = recv_bufs->recv_aux_size;
167 
168 	if (max_aux_size != 0) {
169 		assert(options & MACH64_MSG_VECTOR);
170 	}
171 
172 	if (options & MACH64_RCV_LINEAR_VECTOR) {
173 		assert(max_aux_size == 0);
174 		assert(options & MACH64_MSG_VECTOR);
175 
176 		if (max_msg_size < msg_size + trailer_size + aux_size) {
177 			return true;
178 		}
179 	} else {
180 		if (max_msg_size < msg_size + trailer_size) {
181 			return true;
182 		}
183 
184 		/*
185 		 * only return too large if MACH64_MSG_VECTOR.
186 		 *
187 		 * silently drop aux data when receiver is not expecting it for compat
188 		 * reasons.
189 		 */
190 		if ((options & MACH64_MSG_VECTOR) && max_aux_size < aux_size) {
191 			return true;
192 		}
193 	}
194 
195 	return false;
196 }
197 
198 /*
199  *	Routine:	ipc_mqueue_add_locked.
200  *	Purpose:
201  *		Associate the portset's mqueue with the port's mqueue.
202  *		This has to be done so that posting the port will wakeup
203  *		a portset waiter.  If there are waiters on the portset
204  *		mqueue and messages on the port mqueue, try to match them
205  *		up now.
206  *	Conditions:
207  *		Port and Pset both locked.
208  */
209 kern_return_t
ipc_mqueue_add_locked(ipc_mqueue_t port_mqueue,ipc_pset_t pset,waitq_link_t * linkp)210 ipc_mqueue_add_locked(
211 	ipc_mqueue_t    port_mqueue,
212 	ipc_pset_t      pset,
213 	waitq_link_t   *linkp)
214 {
215 	ipc_port_t       port = ip_from_mq(port_mqueue);
216 	struct waitq_set *wqset = &pset->ips_wqset;
217 	circle_queue_t   kmsgq = &port_mqueue->imq_messages;
218 	kern_return_t    kr = KERN_SUCCESS;
219 	ipc_kmsg_t       kmsg;
220 
221 	kr = waitq_link_locked(&port->ip_waitq, wqset, linkp);
222 	if (kr != KERN_SUCCESS) {
223 		return kr;
224 	}
225 
226 	/*
227 	 * Now that the set has been added to the port, there may be
228 	 * messages queued on the port and threads waiting on the set
229 	 * waitq.  Lets get them together.
230 	 *
231 	 * Only consider this set however, as the other ones have been
232 	 * posted to already.
233 	 */
234 	while ((kmsg = ipc_kmsg_queue_first(kmsgq)) != IKM_NULL) {
235 		mach_msg_size_t msize, tsize, asize;
236 		thread_t th;
237 
238 		th = waitq_wakeup64_identify_locked(wqset, IPC_MQUEUE_RECEIVE,
239 		    THREAD_AWAKENED, WAITQ_KEEP_LOCKED);
240 		/* port and pset still locked, thread not runnable */
241 
242 		if (th == THREAD_NULL) {
243 			/*
244 			 * Didn't find a thread to wake up but messages
245 			 * are enqueued, prepost the set instead,
246 			 * as calling waitq_wakeup64_identify_locked()
247 			 * on the set directly will not take care of it.
248 			 */
249 			waitq_link_prepost_locked(&port->ip_waitq, wqset);
250 			break;
251 		}
252 
253 		/*
254 		 * Because we hold the thread off the runqueue at this point,
255 		 * it's safe to modify ith_ fields on the thread, as
256 		 * until it is resumed, it must be off core or in between
257 		 * the assert wait and returning from the continuation.
258 		 */
259 
260 		/*
261 		 * If the receiver waited with a facility not directly
262 		 * related to Mach messaging, then it isn't prepared to get
263 		 * handed the message directly.  Just set it running, and
264 		 * go look for another thread that can.
265 		 */
266 		if (th->ith_state != MACH_RCV_IN_PROGRESS) {
267 #if MACH_FLIPC
268 			if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
269 				/*
270 				 * wakeup the peeking thread, but
271 				 * continue to loop over the threads
272 				 * waiting on the port's mqueue to see
273 				 * if there are any actual receivers
274 				 */
275 				ipc_mqueue_peek_on_thread_locked(port_mqueue,
276 				    th->ith_option, th);
277 			}
278 #endif /* MACH_FLIPC */
279 
280 			waitq_resume_identified_thread(wqset, th,
281 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
282 			continue;
283 		}
284 
285 		/*
286 		 * Found a receiver. see if they can handle the message
287 		 * correctly (the message is not too large for them, or
288 		 * they didn't care to be informed that the message was
289 		 * too large).  If they can't handle it, take them off
290 		 * the list and let them go back and figure it out and
291 		 * just move onto the next.
292 		 */
293 		msize = ipc_kmsg_copyout_size(kmsg, th->map);
294 		tsize = ipc_kmsg_trailer_size(th->ith_option, th->map);
295 		asize = kmsg->ikm_aux_size;
296 
297 		if (ipc_mqueue_msg_too_large(msize, tsize, asize, th->ith_option,
298 		    &th->ith_recv_bufs)) {
299 			th->ith_state = MACH_RCV_TOO_LARGE;
300 			th->ith_msize = msize;
301 			th->ith_asize = asize;
302 			if (th->ith_option & MACH_RCV_LARGE) {
303 				/*
304 				 * let him go without message
305 				 */
306 				th->ith_receiver_name = port_mqueue->imq_receiver_name;
307 				th->ith_kmsg = IKM_NULL;
308 				th->ith_seqno = 0;
309 
310 				waitq_resume_identified_thread(wqset, th,
311 				    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
312 
313 				continue; /* find another thread */
314 			}
315 		} else {
316 			th->ith_state = MACH_MSG_SUCCESS;
317 		}
318 
319 		/*
320 		 * This thread is going to take this message,
321 		 * so give it the message.
322 		 */
323 		ipc_kmsg_rmqueue(kmsgq, kmsg);
324 
325 #if MACH_FLIPC
326 		mach_node_t  node = kmsg->ikm_node;
327 #endif
328 
329 		ipc_mqueue_release_msgcount(port_mqueue);
330 
331 		th->ith_kmsg = kmsg;
332 		th->ith_seqno = port_mqueue->imq_seqno++;
333 
334 		waitq_resume_identified_thread(wqset, th,
335 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
336 
337 #if MACH_FLIPC
338 		if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
339 			flipc_msg_ack(node, port_mqueue, TRUE);
340 		}
341 #endif
342 	}
343 
344 	return KERN_SUCCESS;
345 }
346 
347 
348 /*
349  *	Routine:	ipc_port_has_klist
350  *	Purpose:
351  *		Returns whether the given port imq_klist field can be used as a klist.
352  */
353 bool
ipc_port_has_klist(ipc_port_t port)354 ipc_port_has_klist(ipc_port_t port)
355 {
356 	return !port->ip_specialreply &&
357 	       port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
358 }
359 
360 static inline struct klist *
ipc_object_klist(ipc_object_t object)361 ipc_object_klist(ipc_object_t object)
362 {
363 	if (io_otype(object) == IOT_PORT) {
364 		ipc_port_t port = ip_object_to_port(object);
365 
366 		return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
367 	}
368 	return &ips_object_to_pset(object)->ips_klist;
369 }
370 
371 /*
372  *	Routine:	ipc_mqueue_changed
373  *	Purpose:
374  *		Wake up receivers waiting in a message queue.
375  *	Conditions:
376  *		The object containing the message queue is locked.
377  */
378 void
ipc_mqueue_changed(ipc_space_t space,struct waitq * waitq)379 ipc_mqueue_changed(
380 	ipc_space_t         space,
381 	struct waitq       *waitq)
382 {
383 	ipc_object_t object = io_from_waitq(waitq);
384 	struct klist *klist = ipc_object_klist(object);
385 
386 	if (klist && SLIST_FIRST(klist)) {
387 		/*
388 		 * Indicate that this message queue is vanishing
389 		 *
390 		 * When this is called, the associated receive right may be in flight
391 		 * between two tasks: the one it used to live in, and the one that armed
392 		 * a port destroyed notification for it.
393 		 *
394 		 * The new process may want to register the port it gets back with an
395 		 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
396 		 * port pending already, in which case we want the imq_klist field to be
397 		 * reusable for nefarious purposes.
398 		 *
399 		 * Fortunately, we really don't need this linkage anymore after this
400 		 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
401 		 *
402 		 * Note: we don't have the space lock here, however, this covers the
403 		 *       case of when a task is terminating the space, triggering
404 		 *       several knote_vanish() calls.
405 		 *
406 		 *       We don't need the lock to observe that the space is inactive as
407 		 *       we just deactivated it on the same thread.
408 		 *
409 		 *       We still need to call knote_vanish() so that the knote is
410 		 *       marked with EV_VANISHED or EV_EOF so that the detach step
411 		 *       in filt_machportdetach is skipped correctly.
412 		 */
413 		assert(space);
414 		knote_vanish(klist, is_active(space));
415 	}
416 
417 	if (io_otype(object) == IOT_PORT) {
418 		ipc_port_t port = ip_object_to_port(object);
419 		if (!port->ip_specialreply) {
420 			ipc_port_adjust_sync_link_state_locked(port,
421 			    PORT_SYNC_LINK_ANY, NULL);
422 		}
423 	} else {
424 		klist_init(klist);
425 	}
426 
427 	/*
428 	 * do not pass WAITQ_UPDATE_INHERITOR, ipc_port_destroy()
429 	 * needs to handle this manually, and the port lock
430 	 * is the waitq lock, so there's really no inefficiency there.
431 	 */
432 	waitq_wakeup64_all_locked(waitq, IPC_MQUEUE_RECEIVE,
433 	    THREAD_RESTART, WAITQ_KEEP_LOCKED);
434 }
435 
436 
437 
438 
439 /*
440  *	Routine:	ipc_mqueue_send
441  *	Purpose:
442  *		Send a message to a message queue.  The message holds a reference
443  *		for the destination port for this message queue in the
444  *		msgh_remote_port field.
445  *
446  *		If unsuccessful, the caller still has possession of
447  *		the message and must do something with it.  If successful,
448  *		the message is queued, given to a receiver, or destroyed.
449  *	Conditions:
450  *		port is locked.
451  *	Returns:
452  *		MACH_MSG_SUCCESS	The message was accepted.
453  *		MACH_SEND_TIMED_OUT	Caller still has message.
454  *		MACH_SEND_INTERRUPTED	Caller still has message.
455  */
456 mach_msg_return_t
ipc_mqueue_send_locked(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option64_t option,mach_msg_timeout_t send_timeout)457 ipc_mqueue_send_locked(
458 	ipc_mqueue_t            mqueue,
459 	ipc_kmsg_t              kmsg,
460 	mach_msg_option64_t     option,
461 	mach_msg_timeout_t      send_timeout)
462 {
463 	ipc_port_t port = ip_from_mq(mqueue);
464 	int wresult;
465 
466 	/*
467 	 *  Don't block if:
468 	 *	1) We're under the queue limit.
469 	 *	2) Caller used the MACH_SEND_ALWAYS internal option.
470 	 *	3) Message is sent to a send-once right.
471 	 */
472 	if (!imq_full(mqueue) ||
473 	    (!imq_full_kernel(mqueue) &&
474 	    ((option & MACH_SEND_ALWAYS) ||
475 	    (MACH_MSGH_BITS_REMOTE(ikm_header(kmsg)->msgh_bits) ==
476 	    MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
477 		mqueue->imq_msgcount++;
478 		assert(mqueue->imq_msgcount > 0);
479 		ip_mq_unlock(port);
480 	} else {
481 		thread_t cur_thread = current_thread();
482 		struct turnstile *send_turnstile = TURNSTILE_NULL;
483 		uint64_t deadline;
484 
485 		/*
486 		 * We have to wait for space to be granted to us.
487 		 */
488 		if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
489 			ip_mq_unlock(port);
490 			return MACH_SEND_TIMED_OUT;
491 		}
492 		if (imq_full_kernel(mqueue)) {
493 			ip_mq_unlock(port);
494 			return MACH_SEND_NO_BUFFER;
495 		}
496 		port->ip_fullwaiters = true;
497 
498 		if (option & MACH_SEND_TIMEOUT) {
499 			clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
500 		} else {
501 			deadline = 0;
502 		}
503 
504 		thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
505 
506 		send_turnstile = turnstile_prepare((uintptr_t)port,
507 		    port_send_turnstile_address(port),
508 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
509 
510 		ipc_port_send_update_inheritor(port, send_turnstile,
511 		    TURNSTILE_DELAYED_UPDATE);
512 
513 		wresult = waitq_assert_wait64_leeway(
514 			&send_turnstile->ts_waitq,
515 			IPC_MQUEUE_FULL,
516 			THREAD_ABORTSAFE,
517 			TIMEOUT_URGENCY_USER_NORMAL,
518 			deadline,
519 			TIMEOUT_NO_LEEWAY);
520 
521 		ip_mq_unlock(port);
522 		turnstile_update_inheritor_complete(send_turnstile,
523 		    TURNSTILE_INTERLOCK_NOT_HELD);
524 
525 		if (wresult == THREAD_WAITING) {
526 			wresult = thread_block(THREAD_CONTINUE_NULL);
527 		}
528 
529 		/* Call turnstile complete with interlock held */
530 		ip_mq_lock(port);
531 		turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
532 		ip_mq_unlock(port);
533 
534 		/* Call cleanup after dropping the interlock */
535 		turnstile_cleanup();
536 
537 		switch (wresult) {
538 		case THREAD_AWAKENED:
539 			/*
540 			 * we can proceed - inherited msgcount from waker
541 			 * or the message queue has been destroyed and the msgcount
542 			 * has been reset to zero (will detect in ipc_mqueue_post()).
543 			 */
544 			break;
545 
546 		case THREAD_TIMED_OUT:
547 			assert(option & MACH_SEND_TIMEOUT);
548 			return MACH_SEND_TIMED_OUT;
549 
550 		case THREAD_INTERRUPTED:
551 			return MACH_SEND_INTERRUPTED;
552 
553 		case THREAD_RESTART:
554 			/* mqueue is being destroyed */
555 			return MACH_SEND_INVALID_DEST;
556 		default:
557 			panic("ipc_mqueue_send");
558 		}
559 	}
560 
561 	ipc_mqueue_post(mqueue, kmsg, option);
562 	return MACH_MSG_SUCCESS;
563 }
564 
565 /*
566  *	Routine:	ipc_mqueue_override_send_locked
567  *	Purpose:
568  *		Set an override qos on the first message in the queue
569  *		(if the queue is full). This is a send-possible override
570  *		that will go away as soon as we drain a message from the
571  *		queue.
572  *
573  *	Conditions:
574  *		The port corresponding to mqueue is locked.
575  *		The caller holds a reference on the message queue.
576  */
577 void
ipc_mqueue_override_send_locked(ipc_mqueue_t mqueue,mach_msg_qos_t qos_ovr)578 ipc_mqueue_override_send_locked(
579 	ipc_mqueue_t        mqueue,
580 	mach_msg_qos_t      qos_ovr)
581 {
582 	ipc_port_t port = ip_from_mq(mqueue);
583 
584 	assert(waitq_is_valid(&port->ip_waitq));
585 
586 	if (imq_full(mqueue)) {
587 		ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
588 
589 		if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, qos_ovr)) {
590 			if (ip_in_a_space(port) &&
591 			    is_active(ip_get_receiver(port)) &&
592 			    ipc_port_has_klist(port)) {
593 				KNOTE(&port->ip_klist, 0);
594 			}
595 		}
596 	}
597 }
598 
599 /*
600  *	Routine:	ipc_mqueue_release_msgcount
601  *	Purpose:
602  *		Release a message queue reference in the case where we
603  *		found a waiter.
604  *
605  *	Conditions:
606  *		The port corresponding to message queue is locked.
607  *		The message corresponding to this reference is off the queue.
608  *		There is no need to pass reserved preposts because this will
609  *		never prepost to anyone
610  */
611 void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)612 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
613 {
614 	ipc_port_t port = ip_from_mq(port_mq);
615 	struct turnstile *send_turnstile = port_send_turnstile(port);
616 
617 	ip_mq_lock_held(port);
618 	assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
619 
620 	port_mq->imq_msgcount--;
621 
622 	if (!imq_full(port_mq) && port->ip_fullwaiters &&
623 	    send_turnstile != TURNSTILE_NULL) {
624 		/*
625 		 * boost the priority of the awoken thread
626 		 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
627 		 * the message queue slot we've just reserved.
628 		 *
629 		 * NOTE: this will never prepost
630 		 *
631 		 * The wakeup happens on a turnstile waitq
632 		 * which will wakeup the highest priority waiter.
633 		 * A potential downside of this would be starving low
634 		 * priority senders if there is a constant churn of
635 		 * high priority threads trying to send to this port.
636 		 */
637 		if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
638 		    IPC_MQUEUE_FULL,
639 		    THREAD_AWAKENED,
640 		    WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
641 			port->ip_fullwaiters = false;
642 		} else {
643 			/* gave away our slot - add reference back */
644 			port_mq->imq_msgcount++;
645 		}
646 	}
647 
648 	if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
649 		waitq_clear_prepost_locked(&port->ip_waitq);
650 	}
651 }
652 
653 /*
654  *	Routine:	ipc_mqueue_post
655  *	Purpose:
656  *		Post a message to a waiting receiver or enqueue it.  If a
657  *		receiver is waiting, we can release our reserved space in
658  *		the message queue.
659  *
660  *	Conditions:
661  *		port is unlocked
662  *		If we need to queue, our space in the message queue is reserved.
663  */
664 static void
ipc_mqueue_post(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option64_t option __unused)665 ipc_mqueue_post(
666 	ipc_mqueue_t               mqueue,
667 	ipc_kmsg_t                 kmsg,
668 	mach_msg_option64_t        option __unused)
669 {
670 	ipc_port_t port = ip_from_mq(mqueue);
671 	struct waitq *waitq = &port->ip_waitq;
672 	boolean_t destroy_msg = FALSE;
673 
674 	ipc_kmsg_trace_send(kmsg, option);
675 
676 	/*
677 	 *	While the msg queue is locked, we have control of the
678 	 *	kmsg, so the ref in it for the port is still good.
679 	 *
680 	 *	Check for a receiver for the message.
681 	 */
682 	ip_mq_lock(port);
683 
684 	/* we may have raced with port destruction! */
685 	if (!waitq_is_valid(&port->ip_waitq)) {
686 		destroy_msg = TRUE;
687 		goto out_unlock;
688 	}
689 
690 	for (;;) {
691 		mach_msg_size_t msize, tsize, asize;
692 		thread_t receiver;
693 
694 		receiver = waitq_wakeup64_identify_locked(waitq,
695 		    IPC_MQUEUE_RECEIVE, THREAD_AWAKENED, WAITQ_KEEP_LOCKED);
696 		/* waitq still locked, thread not runnable */
697 
698 		if (receiver == THREAD_NULL) {
699 			/*
700 			 * no receivers; queue kmsg if space still reserved
701 			 * Reservations are cancelled when the port goes inactive.
702 			 * note that this will enqueue the message for any
703 			 * "peeking" receivers.
704 			 *
705 			 * Also, post the knote to wake up any threads waiting
706 			 * on that style of interface if this insertion is of
707 			 * note (first insertion, or adjusted override qos all
708 			 * the way to the head of the queue).
709 			 *
710 			 * This is just for ports. port-sets knotes are being
711 			 * posted to by the waitq_wakeup64_identify_locked()
712 			 * above already.
713 			 */
714 			if (mqueue->imq_msgcount == 0) {
715 				/*
716 				 * The message queue must belong
717 				 * to an inactive port, so just destroy
718 				 * the message and pretend it was posted.
719 				 */
720 				destroy_msg = TRUE;
721 			} else if (!ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
722 				/*
723 				 * queue was not empty and qos
724 				 * didn't change, nothing to do.
725 				 */
726 			} else if (ip_in_a_space(port) &&
727 			    is_active(ip_get_receiver(port)) &&
728 			    ipc_port_has_klist(port)) {
729 				/*
730 				 * queue was empty or qos changed
731 				 * we need to tell kqueue, unless
732 				 * the space is getting torn down
733 				 */
734 				KNOTE(&port->ip_klist, 0);
735 			}
736 			break;
737 		}
738 
739 #if MACH_FLIPC
740 		/*
741 		 * If a thread is attempting a "peek" into the message queue
742 		 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
743 		 * thread running.  A successful peek is essentially the same as
744 		 * message delivery since the peeking thread takes responsibility
745 		 * for delivering the message and (eventually) removing it from
746 		 * the mqueue.  Only one thread can successfully use the peek
747 		 * facility on any given port, so we exit the waitq loop after
748 		 * encountering such a thread.
749 		 */
750 		if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
751 			ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
752 			ipc_mqueue_peek_on_thread_locked(mqueue, receiver->ith_option, receiver);
753 			waitq_resume_identified_thread(waitq, receiver,
754 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
755 			break; /* Message was posted, so break out of loop */
756 		}
757 #endif /* MACH_FLIPC */
758 
759 		/*
760 		 * If the receiver waited with a facility not directly related
761 		 * to Mach messaging, then it isn't prepared to get handed the
762 		 * message directly. Just set it running, and go look for
763 		 * another thread that can.
764 		 */
765 		if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
766 			waitq_resume_identified_thread(waitq, receiver,
767 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
768 
769 			continue;
770 		}
771 
772 
773 		/*
774 		 * We found a waiting thread.
775 		 * If the message is too large or the scatter list is too small
776 		 * the thread we wake up will get that as its status.
777 		 */
778 		msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
779 		tsize = ipc_kmsg_trailer_size(receiver->ith_option, receiver->map);
780 		asize = kmsg->ikm_aux_size;
781 
782 		if (ipc_mqueue_msg_too_large(msize, tsize, asize, receiver->ith_option,
783 		    &receiver->ith_recv_bufs)) {
784 			receiver->ith_msize = msize;
785 			receiver->ith_asize = asize;
786 			receiver->ith_state = MACH_RCV_TOO_LARGE;
787 		} else {
788 			receiver->ith_state = MACH_MSG_SUCCESS;
789 		}
790 
791 		/*
792 		 * If there is no problem with the upcoming receive, or the
793 		 * receiver thread didn't specifically ask for special too
794 		 * large error condition, go ahead and select it anyway.
795 		 */
796 		if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
797 		    !(receiver->ith_option & MACH_RCV_LARGE)) {
798 			receiver->ith_kmsg = kmsg;
799 			receiver->ith_seqno = mqueue->imq_seqno++;
800 #if MACH_FLIPC
801 			mach_node_t node = kmsg->ikm_node;
802 #endif
803 			waitq_resume_identified_thread(waitq, receiver,
804 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
805 
806 			/* we didn't need our reserved spot in the queue */
807 			ipc_mqueue_release_msgcount(mqueue);
808 
809 #if MACH_FLIPC
810 			if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
811 				flipc_msg_ack(node, mqueue, TRUE);
812 			}
813 #endif
814 			break;
815 		}
816 
817 		/*
818 		 * Otherwise, this thread needs to be released to run
819 		 * and handle its error without getting the message.  We
820 		 * need to go back and pick another one.
821 		 */
822 		receiver->ith_receiver_name = mqueue->imq_receiver_name;
823 		receiver->ith_kmsg = IKM_NULL;
824 		receiver->ith_seqno = 0;
825 
826 		waitq_resume_identified_thread(waitq, receiver,
827 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
828 	}
829 
830 out_unlock:
831 	/* clear the waitq boost we may have been given */
832 	waitq_clear_promotion_locked(waitq, current_thread());
833 	waitq_unlock(waitq);
834 
835 	if (destroy_msg) {
836 		ipc_kmsg_destroy(kmsg, IPC_KMSG_DESTROY_ALL);
837 	}
838 
839 	counter_inc(&current_task()->messages_sent);
840 	return;
841 }
842 
843 
844 static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)845 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
846 {
847 	thread_t                self = current_thread();
848 	mach_msg_option64_t     option64 = self->ith_option;
849 
850 	/*
851 	 * why did we wake up?
852 	 */
853 	switch (saved_wait_result) {
854 	case THREAD_TIMED_OUT:
855 		self->ith_state = MACH_RCV_TIMED_OUT;
856 		return;
857 
858 	case THREAD_INTERRUPTED:
859 		self->ith_state = MACH_RCV_INTERRUPTED;
860 		return;
861 
862 	case THREAD_RESTART:
863 		/* something bad happened to the port/set */
864 		self->ith_state = MACH_RCV_PORT_CHANGED;
865 		return;
866 
867 	case THREAD_AWAKENED:
868 		/*
869 		 * We do not need to go select a message, somebody
870 		 * handed us one (or a too-large indication).
871 		 */
872 		switch (self->ith_state) {
873 		case MACH_RCV_SCATTER_SMALL:
874 		case MACH_RCV_TOO_LARGE:
875 			/*
876 			 * Somebody tried to give us a too large
877 			 * message. If we indicated that we cared,
878 			 * then they only gave us the indication,
879 			 * otherwise they gave us the indication
880 			 * AND the message anyway.
881 			 */
882 			if (option64 & MACH_RCV_LARGE) {
883 				return;
884 			}
885 			return;
886 		case MACH_MSG_SUCCESS:
887 			return;
888 #if MACH_FLIPC
889 		case MACH_PEEK_READY:
890 			return;
891 #endif /* MACH_FLIPC */
892 
893 		default:
894 			panic("ipc_mqueue_receive_results: strange ith_state %d", self->ith_state);
895 		}
896 
897 	default:
898 		panic("ipc_mqueue_receive_results: strange wait_result %d", saved_wait_result);
899 	}
900 }
901 
902 void
ipc_mqueue_receive_continue(__unused void * param,wait_result_t wresult)903 ipc_mqueue_receive_continue(
904 	__unused void *param,
905 	wait_result_t wresult)
906 {
907 	ipc_mqueue_receive_results(wresult);
908 	mach_msg_receive_continue();  /* hard-coded for now */
909 }
910 
911 /*
912  *	Routine:	ipc_mqueue_receive
913  *	Purpose:
914  *		Receive a message from a message queue.
915  *
916  *	Conditions:
917  *		Our caller must hold a reference for the port or port set
918  *		to which this queue belongs, to keep the queue
919  *		from being deallocated.
920  *
921  *		The kmsg is returned with clean header fields
922  *		and with the circular bit turned off through the ith_kmsg
923  *		field of the thread's receive continuation state.
924  *	Returns:
925  *		MACH_MSG_SUCCESS	Message returned in ith_kmsg.
926  *		MACH_RCV_TOO_LARGE	Message size returned in ith_msize,
927  *                          Auxiliary data size returned in ith_asize
928  *		MACH_RCV_TIMED_OUT	No message obtained.
929  *		MACH_RCV_INTERRUPTED	No message obtained.
930  *		MACH_RCV_PORT_DIED	Port/set died; no message.
931  *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
932  *
933  */
934 
935 void
ipc_mqueue_receive(struct waitq * waitq,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread,bool has_continuation)936 ipc_mqueue_receive(
937 	struct waitq           *waitq,
938 	mach_msg_timeout_t      rcv_timeout,
939 	int                     interruptible,
940 	thread_t                thread,
941 	bool                    has_continuation)
942 {
943 	wait_result_t           wresult;
944 
945 	assert(thread == current_thread());
946 	waitq_lock(waitq);
947 
948 	wresult = ipc_mqueue_receive_on_thread_and_unlock(waitq, rcv_timeout,
949 	    interruptible, thread);
950 	/* object unlocked */
951 	if (wresult == THREAD_NOT_WAITING) {
952 		return;
953 	}
954 
955 	if (wresult == THREAD_WAITING) {
956 		if (has_continuation) {
957 			wresult = thread_block(ipc_mqueue_receive_continue);
958 			/* NOTREACHED */
959 		}
960 		wresult = thread_block(THREAD_CONTINUE_NULL);
961 	}
962 	ipc_mqueue_receive_results(wresult);
963 }
964 
965 /*
966  *	Routine:	ipc_mqueue_receive_on_thread_and_unlock
967  *	Purpose:
968  *		Receive a message from a message queue using a specified thread.
969  *		If no message available, assert_wait on the appropriate waitq.
970  *
971  *	Conditions:
972  *		Assumes thread is self.
973  *		The port/port-set waitq is locked on entry, unlocked on return.
974  *		May have assert-waited. Caller must block in those cases.
975  */
976 wait_result_t
ipc_mqueue_receive_on_thread_and_unlock(struct waitq * waitq,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread)977 ipc_mqueue_receive_on_thread_and_unlock(
978 	struct waitq           *waitq,
979 	mach_msg_timeout_t      rcv_timeout,
980 	int                     interruptible,
981 	thread_t                thread)
982 {
983 	mach_msg_option64_t     option64 = thread->ith_option;
984 	ipc_object_t            object = io_from_waitq(waitq);
985 	ipc_port_t              port = IP_NULL;
986 	wait_result_t           wresult;
987 	uint64_t                deadline;
988 	struct turnstile        *rcv_turnstile = TURNSTILE_NULL;
989 
990 	assert(thread == current_thread());
991 
992 	if (waitq_type(waitq) == WQT_PORT_SET) {
993 		ipc_pset_t pset = ips_object_to_pset(object);
994 		wqs_prepost_flags_t wqs_flags = WQS_PREPOST_LOCK;
995 		struct waitq *port_wq;
996 
997 		/*
998 		 * Put the message at the back of the prepost list
999 		 * if it's not a PEEK.
1000 		 *
1001 		 * Might drop the pset lock temporarily.
1002 		 */
1003 #if MACH_FLIPC
1004 		if (option64 & MACH64_PEEK_MSG) {
1005 			wqs_flags |= WQS_PREPOST_PEEK;
1006 		}
1007 #endif /* MACH_FLIPC */
1008 		port_wq = waitq_set_first_prepost(&pset->ips_wqset, wqs_flags);
1009 
1010 		/* Returns with port locked */
1011 
1012 		if (port_wq != NULL) {
1013 			/*
1014 			 * We get here if there is at least one message
1015 			 * waiting on port_wq. We have instructed the prepost
1016 			 * iteration logic to leave both the port_wq and the
1017 			 * set waitq locked.
1018 			 *
1019 			 * Continue on to handling the message with just
1020 			 * the port waitq locked.
1021 			 */
1022 			io_unlock(object);
1023 			port = ip_from_waitq(port_wq);
1024 		}
1025 	} else if (waitq_type(waitq) == WQT_PORT) {
1026 		port = ip_from_waitq(waitq);
1027 		if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
1028 			port = IP_NULL;
1029 		}
1030 	} else {
1031 		panic("Unknown waitq type (%p/0x%x)", waitq, waitq_type(waitq));
1032 	}
1033 
1034 	if (port) {
1035 #if MACH_FLIPC
1036 		if (option64 & MACH64_PEEK_MSG) {
1037 			ipc_mqueue_peek_on_thread_locked(&port->ip_messages,
1038 			    option64, thread);
1039 		} else
1040 #endif /* MACH_FLIPC */
1041 		{
1042 			ipc_mqueue_select_on_thread_locked(&port->ip_messages,
1043 			    option64, thread);
1044 		}
1045 		ip_mq_unlock(port);
1046 		return THREAD_NOT_WAITING;
1047 	}
1048 
1049 	if (!waitq_is_valid(waitq)) {
1050 		/* someone raced us to destroy this mqueue/port! */
1051 		io_unlock(object);
1052 		/*
1053 		 * ipc_mqueue_receive_results updates the thread's ith_state
1054 		 * TODO: differentiate between rights being moved and
1055 		 * rights/ports being destroyed (21885327)
1056 		 */
1057 		return THREAD_RESTART;
1058 	}
1059 
1060 	/*
1061 	 * Looks like we'll have to block.  The waitq we will
1062 	 * block on (whether the set's or the local port's) is
1063 	 * still locked.
1064 	 */
1065 	if ((option64 & MACH_RCV_TIMEOUT) && rcv_timeout == 0) {
1066 		io_unlock(object);
1067 		thread->ith_state = MACH_RCV_TIMED_OUT;
1068 		return THREAD_NOT_WAITING;
1069 	}
1070 
1071 	thread->ith_state = MACH_RCV_IN_PROGRESS;
1072 #if MACH_FLIPC
1073 	if (option64 & MACH64_PEEK_MSG) {
1074 		thread->ith_state = MACH_PEEK_IN_PROGRESS;
1075 	}
1076 #endif /* MACH_FLIPC */
1077 
1078 	if (option64 & MACH_RCV_TIMEOUT) {
1079 		clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
1080 	} else {
1081 		deadline = 0;
1082 	}
1083 
1084 	/*
1085 	 * Threads waiting on a reply port (not portset)
1086 	 * will wait on its receive turnstile.
1087 	 *
1088 	 * Donate waiting thread's turnstile and
1089 	 * setup inheritor for special reply port.
1090 	 * Based on the state of the special reply
1091 	 * port, the inheritor would be the send
1092 	 * turnstile of the connection port on which
1093 	 * the send of sync ipc would happen or
1094 	 * workloop's turnstile who would reply to
1095 	 * the sync ipc message.
1096 	 *
1097 	 * Pass in mqueue wait in waitq_assert_wait to
1098 	 * support port set wakeup. The mqueue waitq of port
1099 	 * will be converted to to turnstile waitq
1100 	 * in waitq_assert_wait instead of global waitqs.
1101 	 */
1102 	if (waitq_type(waitq) == WQT_PORT) {
1103 		port = ip_from_waitq(waitq);
1104 		rcv_turnstile = turnstile_prepare((uintptr_t)port,
1105 		    port_rcv_turnstile_address(port),
1106 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1107 
1108 		ipc_port_recv_update_inheritor(port, rcv_turnstile,
1109 		    TURNSTILE_DELAYED_UPDATE);
1110 	}
1111 
1112 	thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1113 	wresult = waitq_assert_wait64_locked(waitq,
1114 	    IPC_MQUEUE_RECEIVE,
1115 	    interruptible,
1116 	    TIMEOUT_URGENCY_USER_NORMAL,
1117 	    deadline,
1118 	    TIMEOUT_NO_LEEWAY,
1119 	    thread);
1120 	if (wresult == THREAD_AWAKENED) {
1121 		/*
1122 		 * The first thing we did was to look for preposts
1123 		 * (using waitq_set_first_prepost() for sets, or looking
1124 		 * at the port's queue for ports).
1125 		 *
1126 		 * Since we found none, we kept the waitq locked.
1127 		 *
1128 		 * It ensures that waitq_assert_wait64_locked() can't
1129 		 * find pre-posts either, won't drop the waitq lock
1130 		 * either (even for a set), and can't return THREAD_AWAKENED.
1131 		 */
1132 		panic("ipc_mqueue_receive_on_thread: sleep walking");
1133 	}
1134 
1135 	io_unlock(object);
1136 
1137 	/*
1138 	 * After this point, a waiting thread could be found by the wakeup
1139 	 * identify path, and the other side now owns the ith_ fields until
1140 	 * this thread blocks and resumes in the continuation
1141 	 */
1142 
1143 	/* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1144 	if (rcv_turnstile != TURNSTILE_NULL) {
1145 		turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1146 	}
1147 	/* Its callers responsibility to call turnstile_complete to get the turnstile back */
1148 
1149 	return wresult;
1150 }
1151 
1152 #if MACH_FLIPC
1153 /*
1154  *	Routine:	ipc_mqueue_peek_on_thread_locked
1155  *	Purpose:
1156  *		A receiver discovered that there was a message on the queue
1157  *		before he had to block. Tell a thread about the message queue,
1158  *		but don't pick off any messages.
1159  *	Conditions:
1160  *		port_mq locked
1161  *		at least one message on port_mq's message queue
1162  *
1163  *	Returns: (on thread->ith_state)
1164  *		MACH_PEEK_READY		ith_peekq contains a message queue
1165  */
1166 void
ipc_mqueue_peek_on_thread_locked(ipc_mqueue_t port_mq,__assert_only mach_msg_option64_t option64,thread_t thread)1167 ipc_mqueue_peek_on_thread_locked(
1168 	ipc_mqueue_t        port_mq,
1169 	__assert_only mach_msg_option64_t option64,
1170 	thread_t            thread)
1171 {
1172 	assert(option64 & MACH64_PEEK_MSG);
1173 	assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1174 
1175 	/*
1176 	 * Take a reference on the mqueue's associated port:
1177 	 * the peeking thread will be responsible to release this reference
1178 	 */
1179 	ip_validate(ip_from_mq(port_mq));
1180 	ip_reference(ip_from_mq(port_mq));
1181 	thread->ith_peekq = port_mq;
1182 	thread->ith_state = MACH_PEEK_READY;
1183 }
1184 #endif /* MACH_FLIPC */
1185 
1186 /*
1187  *	Routine:	ipc_mqueue_select_on_thread_locked
1188  *	Purpose:
1189  *		A receiver discovered that there was a message on the queue
1190  *		before he had to block.  Pick the message off the queue and
1191  *		"post" it to thread.
1192  *	Conditions:
1193  *		port locked.
1194  *              thread not locked.
1195  *		There is a message.
1196  *		No need to reserve prepost objects - it will never prepost
1197  *
1198  *	Returns:
1199  *		MACH_MSG_SUCCESS	Actually selected a message for ourselves.
1200  *		MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
1201  */
1202 void
ipc_mqueue_select_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option64_t options,thread_t thread)1203 ipc_mqueue_select_on_thread_locked(
1204 	ipc_mqueue_t            port_mq,
1205 	mach_msg_option64_t     options,
1206 	thread_t                thread)
1207 {
1208 	mach_msg_size_t msize, tsize, asize;
1209 	ipc_kmsg_t kmsg;
1210 
1211 	mach_msg_return_t mr = MACH_MSG_SUCCESS;
1212 
1213 	/*
1214 	 * Do some sanity checking of our ability to receive
1215 	 * before pulling the message off the queue.
1216 	 */
1217 	kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1218 	assert(kmsg != IKM_NULL);
1219 
1220 	/*
1221 	 * If we really can't receive it, but we had the
1222 	 * MACH_RCV_LARGE option set, then don't take it off
1223 	 * the queue, instead return the appropriate error
1224 	 * (and size needed).
1225 	 */
1226 	msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1227 	tsize = ipc_kmsg_trailer_size(options, thread->map);
1228 	asize = kmsg->ikm_aux_size;
1229 
1230 	if (ipc_mqueue_msg_too_large(msize, tsize, asize, options,
1231 	    &thread->ith_recv_bufs)) {
1232 		mr = MACH_RCV_TOO_LARGE;
1233 		if (options & MACH_RCV_LARGE) {
1234 			(void)ipc_kmsg_validate_signature(kmsg);
1235 			thread->ith_receiver_name = port_mq->imq_receiver_name;
1236 			thread->ith_kmsg = IKM_NULL;
1237 			thread->ith_msize = msize;
1238 			thread->ith_asize = asize;
1239 			thread->ith_seqno = 0;
1240 			thread->ith_state = mr;
1241 			return;
1242 		}
1243 	}
1244 
1245 	ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1246 #if MACH_FLIPC
1247 	if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1248 		flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1249 	}
1250 #endif
1251 	ipc_mqueue_release_msgcount(port_mq);
1252 	thread->ith_seqno = port_mq->imq_seqno++;
1253 	thread->ith_kmsg = kmsg;
1254 	thread->ith_state = mr;
1255 
1256 	counter_inc(&current_task()->messages_received);
1257 	return;
1258 }
1259 
1260 /*
1261  *	Routine:	ipc_mqueue_peek_locked
1262  *	Purpose:
1263  *		Peek at a (non-set) message queue to see if it has a message
1264  *		matching the sequence number provided (if zero, then the
1265  *		first message in the queue) and return vital info about the
1266  *		message.
1267  *
1268  *	Conditions:
1269  *		The io object corresponding to mq is locked by callers.
1270  *		Other locks may be held by callers, so this routine cannot block.
1271  *		Caller holds reference on the message queue.
1272  */
1273 unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1274 ipc_mqueue_peek_locked(
1275 	ipc_mqueue_t            mq,
1276 	mach_port_seqno_t      *seqnop,
1277 	mach_msg_size_t        *msg_sizep,
1278 	mach_msg_id_t          *msg_idp,
1279 	mach_msg_max_trailer_t *msg_trailerp,
1280 	ipc_kmsg_t             *kmsgp)
1281 {
1282 	ipc_kmsg_queue_t kmsgq;
1283 	ipc_kmsg_t kmsg;
1284 	mach_port_seqno_t seqno, msgoff;
1285 	unsigned res = 0;
1286 	mach_msg_header_t *hdr;
1287 
1288 	seqno = 0;
1289 	if (seqnop != NULL) {
1290 		seqno = *seqnop;
1291 	}
1292 
1293 	if (seqno == 0) {
1294 		seqno = mq->imq_seqno;
1295 		msgoff = 0;
1296 	} else if (seqno >= mq->imq_seqno &&
1297 	    seqno < mq->imq_seqno + mq->imq_msgcount) {
1298 		msgoff = seqno - mq->imq_seqno;
1299 	} else {
1300 		goto out;
1301 	}
1302 
1303 	/* look for the message that would match that seqno */
1304 	kmsgq = &mq->imq_messages;
1305 	kmsg = ipc_kmsg_queue_first(kmsgq);
1306 	while (msgoff-- && kmsg != IKM_NULL) {
1307 		kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1308 	}
1309 	if (kmsg == IKM_NULL) {
1310 		goto out;
1311 	}
1312 
1313 	/*
1314 	 * Validate kmsg signature before doing anything with it. Since we are holding
1315 	 * the mqueue lock here, and only header + trailer will be peeked on, just
1316 	 * do a partial validation to finish quickly.
1317 	 *
1318 	 * Partial kmsg signature is only supported on PAC devices.
1319 	 */
1320 	(void)ipc_kmsg_validate_signature(kmsg);
1321 
1322 	hdr = ikm_header(kmsg);
1323 	/* found one - return the requested info */
1324 	if (seqnop != NULL) {
1325 		*seqnop = seqno;
1326 	}
1327 	if (msg_sizep != NULL) {
1328 		*msg_sizep = hdr->msgh_size;
1329 	}
1330 	if (msg_idp != NULL) {
1331 		*msg_idp = hdr->msgh_id;
1332 	}
1333 	if (msg_trailerp != NULL) {
1334 		*msg_trailerp = *ipc_kmsg_get_trailer(kmsg);
1335 	}
1336 	if (kmsgp != NULL) {
1337 		*kmsgp = kmsg;
1338 	}
1339 
1340 	res = 1;
1341 
1342 out:
1343 	return res;
1344 }
1345 
1346 
1347 /*
1348  *	Routine:	ipc_mqueue_peek
1349  *	Purpose:
1350  *		Peek at a (non-set) message queue to see if it has a message
1351  *		matching the sequence number provided (if zero, then the
1352  *		first message in the queue) and return vital info about the
1353  *		message.
1354  *
1355  *	Conditions:
1356  *		The ipc_mqueue_t is unlocked.
1357  *		Locks may be held by callers, so this routine cannot block.
1358  *		Caller holds reference on the message queue.
1359  */
1360 unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1361 ipc_mqueue_peek(ipc_mqueue_t mq,
1362     mach_port_seqno_t * seqnop,
1363     mach_msg_size_t * msg_sizep,
1364     mach_msg_id_t * msg_idp,
1365     mach_msg_max_trailer_t * msg_trailerp,
1366     ipc_kmsg_t *kmsgp)
1367 {
1368 	ipc_port_t port = ip_from_mq(mq);
1369 	unsigned res;
1370 
1371 	ip_mq_lock(port);
1372 
1373 	res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1374 	    msg_trailerp, kmsgp);
1375 
1376 	ip_mq_unlock(port);
1377 	return res;
1378 }
1379 
1380 #if MACH_FLIPC
1381 /*
1382  *	Routine:	ipc_mqueue_release_peek_ref
1383  *	Purpose:
1384  *		Release the reference on an mqueue's associated port which was
1385  *		granted to a thread in ipc_mqueue_peek_on_thread (on the
1386  *		MACH64_PEEK_MSG thread wakeup path).
1387  *
1388  *	Conditions:
1389  *		The ipc_mqueue_t should be locked on entry.
1390  *		The ipc_mqueue_t will be _unlocked_ on return
1391  *			(and potentially invalid!)
1392  *
1393  */
1394 void
ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)1395 ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)
1396 {
1397 	ipc_port_t port = ip_from_mq(mqueue);
1398 
1399 	ip_mq_lock_held(port);
1400 
1401 	/*
1402 	 * clear any preposts this mq may have generated
1403 	 * (which would cause subsequent immediate wakeups)
1404 	 */
1405 	waitq_clear_prepost_locked(&port->ip_waitq);
1406 
1407 	ip_mq_unlock(port);
1408 
1409 	/*
1410 	 * release the port reference: we need to do this outside the lock
1411 	 * because we might be holding the last port reference!
1412 	 **/
1413 	ip_release(port);
1414 }
1415 #endif /* MACH_FLIPC */
1416 
1417 /*
1418  *	Routine:	ipc_mqueue_destroy_locked
1419  *	Purpose:
1420  *		Destroy a message queue.
1421  *		Set any blocked senders running.
1422  *		Destroy the kmsgs in the queue.
1423  *	Conditions:
1424  *		port locked
1425  *		Receivers were removed when the receive right was "changed"
1426  */
1427 boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue,waitq_link_list_t * free_l)1428 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue, waitq_link_list_t *free_l)
1429 {
1430 	ipc_port_t port = ip_from_mq(mqueue);
1431 	boolean_t reap = FALSE;
1432 	struct turnstile *send_turnstile = port_send_turnstile(port);
1433 
1434 	/*
1435 	 *	rouse all blocked senders
1436 	 *	(don't boost anyone - we're tearing this queue down)
1437 	 *	(never preposts)
1438 	 */
1439 	port->ip_fullwaiters = false;
1440 
1441 	if (send_turnstile != TURNSTILE_NULL) {
1442 		waitq_wakeup64_all(&send_turnstile->ts_waitq,
1443 		    IPC_MQUEUE_FULL,
1444 		    THREAD_RESTART, WAITQ_WAKEUP_DEFAULT);
1445 	}
1446 
1447 #if MACH_FLIPC
1448 	ipc_kmsg_t kmsg;
1449 
1450 	cqe_foreach_element_safe(kmsg, &mqueue->imq_messages, ikm_link) {
1451 		if (MACH_NODE_VALID(kmsg->ikm_node) &&
1452 		    FPORT_VALID(mqueue->imq_fport)) {
1453 			flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1454 		}
1455 	}
1456 #endif
1457 
1458 	/*
1459 	 * Move messages from the specified queue to the per-thread
1460 	 * clean/drain queue while we have the mqueue lock.
1461 	 */
1462 	reap = ipc_kmsg_delayed_destroy_queue(&mqueue->imq_messages);
1463 
1464 	/*
1465 	 * Wipe out message count, both for messages about to be
1466 	 * reaped and for reserved space for (previously) woken senders.
1467 	 * This is the indication to them that their reserved space is gone
1468 	 * (the mqueue was destroyed).
1469 	 */
1470 	mqueue->imq_msgcount = 0;
1471 
1472 	/*
1473 	 * invalidate the waitq for subsequent mqueue operations,
1474 	 * the port lock could be dropped after invalidating the mqueue.
1475 	 */
1476 
1477 	waitq_invalidate(&port->ip_waitq);
1478 
1479 	waitq_unlink_all_locked(&port->ip_waitq, NULL, free_l);
1480 
1481 	return reap;
1482 }
1483 
1484 /*
1485  *	Routine:	ipc_mqueue_set_qlimit_locked
1486  *	Purpose:
1487  *		Changes a message queue limit; the maximum number
1488  *		of messages which may be queued.
1489  *	Conditions:
1490  *		Port locked.
1491  */
1492 
1493 void
ipc_mqueue_set_qlimit_locked(ipc_mqueue_t mqueue,mach_port_msgcount_t qlimit)1494 ipc_mqueue_set_qlimit_locked(
1495 	ipc_mqueue_t           mqueue,
1496 	mach_port_msgcount_t   qlimit)
1497 {
1498 	ipc_port_t port = ip_from_mq(mqueue);
1499 
1500 	assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1501 
1502 	/* wake up senders allowed by the new qlimit */
1503 	if (qlimit > mqueue->imq_qlimit) {
1504 		mach_port_msgcount_t i, wakeup;
1505 		struct turnstile *send_turnstile = port_send_turnstile(port);
1506 
1507 		/* caution: wakeup, qlimit are unsigned */
1508 		wakeup = qlimit - mqueue->imq_qlimit;
1509 
1510 		for (i = 0; i < wakeup; i++) {
1511 			/*
1512 			 * boost the priority of the awoken thread
1513 			 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1514 			 * the message queue slot we've just reserved.
1515 			 *
1516 			 * NOTE: this will never prepost
1517 			 */
1518 			if (send_turnstile == TURNSTILE_NULL ||
1519 			    waitq_wakeup64_one(&send_turnstile->ts_waitq,
1520 			    IPC_MQUEUE_FULL,
1521 			    THREAD_AWAKENED,
1522 			    WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1523 				port->ip_fullwaiters = false;
1524 				break;
1525 			}
1526 			mqueue->imq_msgcount++;  /* give it to the awakened thread */
1527 		}
1528 	}
1529 	mqueue->imq_qlimit = (uint16_t)qlimit;
1530 }
1531 
1532 /*
1533  *	Routine:	ipc_mqueue_set_seqno_locked
1534  *	Purpose:
1535  *		Changes an mqueue's sequence number.
1536  *	Conditions:
1537  *		Caller holds a reference to the queue's containing object.
1538  */
1539 void
ipc_mqueue_set_seqno_locked(ipc_mqueue_t mqueue,mach_port_seqno_t seqno)1540 ipc_mqueue_set_seqno_locked(
1541 	ipc_mqueue_t            mqueue,
1542 	mach_port_seqno_t       seqno)
1543 {
1544 	mqueue->imq_seqno = seqno;
1545 }
1546 
1547 
1548 /*
1549  *	Routine:	ipc_mqueue_copyin
1550  *	Purpose:
1551  *		Convert a name in a space to a message queue.
1552  *	Conditions:
1553  *		Nothing locked.  If successful, the caller gets a ref for
1554  *		for the object.	This ref ensures the continued existence of
1555  *		the queue.
1556  *	Returns:
1557  *		MACH_MSG_SUCCESS	Found a message queue.
1558  *		MACH_RCV_INVALID_NAME	The space is dead.
1559  *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
1560  *		MACH_RCV_INVALID_NAME
1561  *			The denoted right is not receive or port set.
1562  *		MACH_RCV_IN_SET		Receive right is a member of a set.
1563  */
1564 
1565 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_object_t * objectp)1566 ipc_mqueue_copyin(
1567 	ipc_space_t             space,
1568 	mach_port_name_t        name,
1569 	ipc_object_t            *objectp)
1570 {
1571 	ipc_entry_bits_t bits;
1572 	ipc_object_t object;
1573 	kern_return_t kr;
1574 
1575 	kr = ipc_right_lookup_read(space, name, &bits, &object);
1576 	if (kr != KERN_SUCCESS) {
1577 		return MACH_RCV_INVALID_NAME;
1578 	}
1579 	/* object is locked and active */
1580 
1581 	if (bits & MACH_PORT_TYPE_RECEIVE) {
1582 		__assert_only ipc_port_t port = ip_object_to_port(object);
1583 		assert(ip_get_receiver_name(port) == name);
1584 		assert(ip_in_space(port, space));
1585 	}
1586 	if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1587 		io_reference(object);
1588 		io_unlock(object);
1589 	} else {
1590 		io_unlock(object);
1591 		/* guard exception if we never held the receive right in this entry */
1592 		if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
1593 			mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME);
1594 		}
1595 		return MACH_RCV_INVALID_NAME;
1596 	}
1597 
1598 	*objectp = object;
1599 	return MACH_MSG_SUCCESS;
1600 }
1601