1 /*
2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /*
29 * @OSF_FREE_COPYRIGHT@
30 */
31 /*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or [email protected]
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56 /*
57 */
58 /*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
65 /*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections. This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
72
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76
77 #include <kern/assert.h>
78 #include <kern/counter.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86
87 #include <ipc/port.h>
88 #include <ipc/ipc_mqueue.h>
89 #include <ipc/ipc_kmsg.h>
90 #include <ipc/ipc_right.h>
91 #include <ipc/ipc_port.h>
92 #include <ipc/ipc_pset.h>
93 #include <ipc/ipc_space.h>
94
95 #if MACH_FLIPC
96 #include <ipc/flipc.h>
97 #endif
98
99 #ifdef __LP64__
100 #include <vm/vm_map.h>
101 #endif
102
103 #include <sys/event.h>
104
105 extern char *proc_name_address(void *p);
106
107 int ipc_mqueue_full; /* address is event for queue space */
108 int ipc_mqueue_rcv; /* address is event for message arrival */
109
110 /* forward declarations */
111 static void ipc_mqueue_receive_results(wait_result_t result);
112
113 #if MACH_FLIPC
114 static void ipc_mqueue_peek_on_thread_locked(
115 ipc_mqueue_t port_mq,
116 mach_msg_option64_t option,
117 thread_t thread);
118 #endif /* MACH_FLIPC */
119
120 /* Deliver message to message queue or waiting receiver */
121 static void ipc_mqueue_post(
122 ipc_mqueue_t mqueue,
123 ipc_kmsg_t kmsg,
124 mach_msg_option64_t option);
125
126 /*
127 * Routine: ipc_mqueue_init
128 * Purpose:
129 * Initialize a newly-allocated message queue.
130 */
131 void
ipc_mqueue_init(ipc_mqueue_t mqueue)132 ipc_mqueue_init(
133 ipc_mqueue_t mqueue)
134 {
135 ipc_kmsg_queue_init(&mqueue->imq_messages);
136 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
137 klist_init(&mqueue->imq_klist);
138 }
139
140 /*
141 * Routine: ipc_mqueue_msg_too_large
142 * Purpose:
143 * Return true if kmsg is too large to be received:
144 *
145 * If MACH64_RCV_LINEAR_VECTOR:
146 * - combined message buffer is not large enough
147 * to fit both the message (plus trailer) and
148 * auxiliary data.
149 * Otherwise:
150 * - message buffer is not large enough
151 * - auxiliary buffer is not large enough:
152 * (1) kmsg is a vector with aux, but user expects
153 * a scalar kmsg (ith_max_asize is 0)
154 * (2) kmsg is a vector with aux, but user aux
155 * buffer is not large enough.
156 */
157 static bool
ipc_mqueue_msg_too_large(mach_msg_size_t msg_size,mach_msg_size_t trailer_size,mach_msg_size_t aux_size,mach_msg_option64_t options,mach_msg_recv_bufs_t * recv_bufs)158 ipc_mqueue_msg_too_large(
159 mach_msg_size_t msg_size,
160 mach_msg_size_t trailer_size,
161 mach_msg_size_t aux_size,
162 mach_msg_option64_t options,
163 mach_msg_recv_bufs_t *recv_bufs)
164 {
165 mach_msg_size_t max_msg_size = recv_bufs->recv_msg_size;
166 mach_msg_size_t max_aux_size = recv_bufs->recv_aux_size;
167
168 if (max_aux_size != 0) {
169 assert(options & MACH64_MSG_VECTOR);
170 }
171
172 if (options & MACH64_RCV_LINEAR_VECTOR) {
173 assert(max_aux_size == 0);
174 assert(options & MACH64_MSG_VECTOR);
175
176 if (max_msg_size < msg_size + trailer_size + aux_size) {
177 return true;
178 }
179 } else {
180 if (max_msg_size < msg_size + trailer_size) {
181 return true;
182 }
183
184 /*
185 * only return too large if MACH64_MSG_VECTOR.
186 *
187 * silently drop aux data when receiver is not expecting it for compat
188 * reasons.
189 */
190 if ((options & MACH64_MSG_VECTOR) && max_aux_size < aux_size) {
191 return true;
192 }
193 }
194
195 return false;
196 }
197
198 /*
199 * Routine: ipc_mqueue_add_locked.
200 * Purpose:
201 * Associate the portset's mqueue with the port's mqueue.
202 * This has to be done so that posting the port will wakeup
203 * a portset waiter. If there are waiters on the portset
204 * mqueue and messages on the port mqueue, try to match them
205 * up now.
206 * Conditions:
207 * Port and Pset both locked.
208 */
209 kern_return_t
ipc_mqueue_add_locked(ipc_mqueue_t port_mqueue,ipc_pset_t pset,waitq_link_t * linkp)210 ipc_mqueue_add_locked(
211 ipc_mqueue_t port_mqueue,
212 ipc_pset_t pset,
213 waitq_link_t *linkp)
214 {
215 ipc_port_t port = ip_from_mq(port_mqueue);
216 struct waitq_set *wqset = &pset->ips_wqset;
217 circle_queue_t kmsgq = &port_mqueue->imq_messages;
218 kern_return_t kr = KERN_SUCCESS;
219 ipc_kmsg_t kmsg;
220
221 kr = waitq_link_locked(&port->ip_waitq, wqset, linkp);
222 if (kr != KERN_SUCCESS) {
223 return kr;
224 }
225
226 /*
227 * Now that the set has been added to the port, there may be
228 * messages queued on the port and threads waiting on the set
229 * waitq. Lets get them together.
230 *
231 * Only consider this set however, as the other ones have been
232 * posted to already.
233 */
234 while ((kmsg = ipc_kmsg_queue_first(kmsgq)) != IKM_NULL) {
235 mach_msg_size_t msize, tsize, asize;
236 thread_t th;
237
238 th = waitq_wakeup64_identify_locked(wqset, IPC_MQUEUE_RECEIVE,
239 THREAD_AWAKENED, WAITQ_KEEP_LOCKED);
240 /* port and pset still locked, thread not runnable */
241
242 if (th == THREAD_NULL) {
243 /*
244 * Didn't find a thread to wake up but messages
245 * are enqueued, prepost the set instead,
246 * as calling waitq_wakeup64_identify_locked()
247 * on the set directly will not take care of it.
248 */
249 waitq_link_prepost_locked(&port->ip_waitq, wqset);
250 break;
251 }
252
253 /*
254 * Because we hold the thread off the runqueue at this point,
255 * it's safe to modify ith_ fields on the thread, as
256 * until it is resumed, it must be off core or in between
257 * the assert wait and returning from the continuation.
258 */
259
260 /*
261 * If the receiver waited with a facility not directly
262 * related to Mach messaging, then it isn't prepared to get
263 * handed the message directly. Just set it running, and
264 * go look for another thread that can.
265 */
266 if (th->ith_state != MACH_RCV_IN_PROGRESS) {
267 #if MACH_FLIPC
268 if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
269 /*
270 * wakeup the peeking thread, but
271 * continue to loop over the threads
272 * waiting on the port's mqueue to see
273 * if there are any actual receivers
274 */
275 ipc_mqueue_peek_on_thread_locked(port_mqueue,
276 th->ith_option, th);
277 }
278 #endif /* MACH_FLIPC */
279
280 waitq_resume_identified_thread(wqset, th,
281 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
282 continue;
283 }
284
285 /*
286 * Found a receiver. see if they can handle the message
287 * correctly (the message is not too large for them, or
288 * they didn't care to be informed that the message was
289 * too large). If they can't handle it, take them off
290 * the list and let them go back and figure it out and
291 * just move onto the next.
292 */
293 msize = ipc_kmsg_copyout_size(kmsg, th->map);
294 tsize = ipc_kmsg_trailer_size(th->ith_option, th->map);
295 asize = kmsg->ikm_aux_size;
296
297 if (ipc_mqueue_msg_too_large(msize, tsize, asize, th->ith_option,
298 &th->ith_recv_bufs)) {
299 th->ith_state = MACH_RCV_TOO_LARGE;
300 th->ith_msize = msize;
301 th->ith_asize = asize;
302 if (th->ith_option & MACH_RCV_LARGE) {
303 /*
304 * let him go without message
305 */
306 th->ith_receiver_name = port_mqueue->imq_receiver_name;
307 th->ith_kmsg = IKM_NULL;
308 th->ith_seqno = 0;
309
310 waitq_resume_identified_thread(wqset, th,
311 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
312
313 continue; /* find another thread */
314 }
315 } else {
316 th->ith_state = MACH_MSG_SUCCESS;
317 }
318
319 /*
320 * This thread is going to take this message,
321 * so give it the message.
322 */
323 ipc_kmsg_rmqueue(kmsgq, kmsg);
324
325 #if MACH_FLIPC
326 mach_node_t node = kmsg->ikm_node;
327 #endif
328
329 ipc_mqueue_release_msgcount(port_mqueue);
330
331 th->ith_kmsg = kmsg;
332 th->ith_seqno = port_mqueue->imq_seqno++;
333
334 waitq_resume_identified_thread(wqset, th,
335 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
336
337 #if MACH_FLIPC
338 if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
339 flipc_msg_ack(node, port_mqueue, TRUE);
340 }
341 #endif
342 }
343
344 return KERN_SUCCESS;
345 }
346
347
348 /*
349 * Routine: ipc_port_has_klist
350 * Purpose:
351 * Returns whether the given port imq_klist field can be used as a klist.
352 */
353 bool
ipc_port_has_klist(ipc_port_t port)354 ipc_port_has_klist(ipc_port_t port)
355 {
356 return !port->ip_specialreply &&
357 port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
358 }
359
360 static inline struct klist *
ipc_object_klist(ipc_object_t object)361 ipc_object_klist(ipc_object_t object)
362 {
363 if (io_otype(object) == IOT_PORT) {
364 ipc_port_t port = ip_object_to_port(object);
365
366 return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
367 }
368 return &ips_object_to_pset(object)->ips_klist;
369 }
370
371 /*
372 * Routine: ipc_mqueue_changed
373 * Purpose:
374 * Wake up receivers waiting in a message queue.
375 * Conditions:
376 * The object containing the message queue is locked.
377 */
378 void
ipc_mqueue_changed(ipc_space_t space,struct waitq * waitq)379 ipc_mqueue_changed(
380 ipc_space_t space,
381 struct waitq *waitq)
382 {
383 ipc_object_t object = io_from_waitq(waitq);
384 struct klist *klist = ipc_object_klist(object);
385
386 if (klist && SLIST_FIRST(klist)) {
387 /*
388 * Indicate that this message queue is vanishing
389 *
390 * When this is called, the associated receive right may be in flight
391 * between two tasks: the one it used to live in, and the one that armed
392 * a port destroyed notification for it.
393 *
394 * The new process may want to register the port it gets back with an
395 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
396 * port pending already, in which case we want the imq_klist field to be
397 * reusable for nefarious purposes.
398 *
399 * Fortunately, we really don't need this linkage anymore after this
400 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
401 *
402 * Note: we don't have the space lock here, however, this covers the
403 * case of when a task is terminating the space, triggering
404 * several knote_vanish() calls.
405 *
406 * We don't need the lock to observe that the space is inactive as
407 * we just deactivated it on the same thread.
408 *
409 * We still need to call knote_vanish() so that the knote is
410 * marked with EV_VANISHED or EV_EOF so that the detach step
411 * in filt_machportdetach is skipped correctly.
412 */
413 assert(space);
414 knote_vanish(klist, is_active(space));
415 }
416
417 if (io_otype(object) == IOT_PORT) {
418 ipc_port_t port = ip_object_to_port(object);
419 if (!port->ip_specialreply) {
420 ipc_port_adjust_sync_link_state_locked(port,
421 PORT_SYNC_LINK_ANY, NULL);
422 }
423 } else {
424 klist_init(klist);
425 }
426
427 /*
428 * do not pass WAITQ_UPDATE_INHERITOR, ipc_port_destroy()
429 * needs to handle this manually, and the port lock
430 * is the waitq lock, so there's really no inefficiency there.
431 */
432 waitq_wakeup64_all_locked(waitq, IPC_MQUEUE_RECEIVE,
433 THREAD_RESTART, WAITQ_KEEP_LOCKED);
434 }
435
436
437
438
439 /*
440 * Routine: ipc_mqueue_send
441 * Purpose:
442 * Send a message to a message queue. The message holds a reference
443 * for the destination port for this message queue in the
444 * msgh_remote_port field.
445 *
446 * If unsuccessful, the caller still has possession of
447 * the message and must do something with it. If successful,
448 * the message is queued, given to a receiver, or destroyed.
449 * Conditions:
450 * port is locked.
451 * Returns:
452 * MACH_MSG_SUCCESS The message was accepted.
453 * MACH_SEND_TIMED_OUT Caller still has message.
454 * MACH_SEND_INTERRUPTED Caller still has message.
455 */
456 mach_msg_return_t
ipc_mqueue_send_locked(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option64_t option,mach_msg_timeout_t send_timeout)457 ipc_mqueue_send_locked(
458 ipc_mqueue_t mqueue,
459 ipc_kmsg_t kmsg,
460 mach_msg_option64_t option,
461 mach_msg_timeout_t send_timeout)
462 {
463 ipc_port_t port = ip_from_mq(mqueue);
464 int wresult;
465
466 /*
467 * Don't block if:
468 * 1) We're under the queue limit.
469 * 2) Caller used the MACH_SEND_ALWAYS internal option.
470 * 3) Message is sent to a send-once right.
471 */
472 if (!imq_full(mqueue) ||
473 (!imq_full_kernel(mqueue) &&
474 ((option & MACH_SEND_ALWAYS) ||
475 (MACH_MSGH_BITS_REMOTE(ikm_header(kmsg)->msgh_bits) ==
476 MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
477 mqueue->imq_msgcount++;
478 assert(mqueue->imq_msgcount > 0);
479 ip_mq_unlock(port);
480 } else {
481 thread_t cur_thread = current_thread();
482 struct turnstile *send_turnstile = TURNSTILE_NULL;
483 uint64_t deadline;
484
485 /*
486 * We have to wait for space to be granted to us.
487 */
488 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
489 ip_mq_unlock(port);
490 return MACH_SEND_TIMED_OUT;
491 }
492 if (imq_full_kernel(mqueue)) {
493 ip_mq_unlock(port);
494 return MACH_SEND_NO_BUFFER;
495 }
496 port->ip_fullwaiters = true;
497
498 if (option & MACH_SEND_TIMEOUT) {
499 clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
500 } else {
501 deadline = 0;
502 }
503
504 thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
505
506 send_turnstile = turnstile_prepare((uintptr_t)port,
507 port_send_turnstile_address(port),
508 TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
509
510 ipc_port_send_update_inheritor(port, send_turnstile,
511 TURNSTILE_DELAYED_UPDATE);
512
513 wresult = waitq_assert_wait64_leeway(
514 &send_turnstile->ts_waitq,
515 IPC_MQUEUE_FULL,
516 THREAD_ABORTSAFE,
517 TIMEOUT_URGENCY_USER_NORMAL,
518 deadline,
519 TIMEOUT_NO_LEEWAY);
520
521 ip_mq_unlock(port);
522 turnstile_update_inheritor_complete(send_turnstile,
523 TURNSTILE_INTERLOCK_NOT_HELD);
524
525 if (wresult == THREAD_WAITING) {
526 wresult = thread_block(THREAD_CONTINUE_NULL);
527 }
528
529 /* Call turnstile complete with interlock held */
530 ip_mq_lock(port);
531 turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
532 ip_mq_unlock(port);
533
534 /* Call cleanup after dropping the interlock */
535 turnstile_cleanup();
536
537 switch (wresult) {
538 case THREAD_AWAKENED:
539 /*
540 * we can proceed - inherited msgcount from waker
541 * or the message queue has been destroyed and the msgcount
542 * has been reset to zero (will detect in ipc_mqueue_post()).
543 */
544 break;
545
546 case THREAD_TIMED_OUT:
547 assert(option & MACH_SEND_TIMEOUT);
548 return MACH_SEND_TIMED_OUT;
549
550 case THREAD_INTERRUPTED:
551 return MACH_SEND_INTERRUPTED;
552
553 case THREAD_RESTART:
554 /* mqueue is being destroyed */
555 return MACH_SEND_INVALID_DEST;
556 default:
557 panic("ipc_mqueue_send");
558 }
559 }
560
561 ipc_mqueue_post(mqueue, kmsg, option);
562 return MACH_MSG_SUCCESS;
563 }
564
565 /*
566 * Routine: ipc_mqueue_override_send_locked
567 * Purpose:
568 * Set an override qos on the first message in the queue
569 * (if the queue is full). This is a send-possible override
570 * that will go away as soon as we drain a message from the
571 * queue.
572 *
573 * Conditions:
574 * The port corresponding to mqueue is locked.
575 * The caller holds a reference on the message queue.
576 */
577 void
ipc_mqueue_override_send_locked(ipc_mqueue_t mqueue,mach_msg_qos_t qos_ovr)578 ipc_mqueue_override_send_locked(
579 ipc_mqueue_t mqueue,
580 mach_msg_qos_t qos_ovr)
581 {
582 ipc_port_t port = ip_from_mq(mqueue);
583
584 assert(waitq_is_valid(&port->ip_waitq));
585
586 if (imq_full(mqueue)) {
587 ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
588
589 if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, qos_ovr)) {
590 if (ip_in_a_space(port) &&
591 is_active(ip_get_receiver(port)) &&
592 ipc_port_has_klist(port)) {
593 KNOTE(&port->ip_klist, 0);
594 }
595 }
596 }
597 }
598
599 /*
600 * Routine: ipc_mqueue_release_msgcount
601 * Purpose:
602 * Release a message queue reference in the case where we
603 * found a waiter.
604 *
605 * Conditions:
606 * The port corresponding to message queue is locked.
607 * The message corresponding to this reference is off the queue.
608 * There is no need to pass reserved preposts because this will
609 * never prepost to anyone
610 */
611 void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)612 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
613 {
614 ipc_port_t port = ip_from_mq(port_mq);
615 struct turnstile *send_turnstile = port_send_turnstile(port);
616
617 ip_mq_lock_held(port);
618 assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
619
620 port_mq->imq_msgcount--;
621
622 if (!imq_full(port_mq) && port->ip_fullwaiters &&
623 send_turnstile != TURNSTILE_NULL) {
624 /*
625 * boost the priority of the awoken thread
626 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
627 * the message queue slot we've just reserved.
628 *
629 * NOTE: this will never prepost
630 *
631 * The wakeup happens on a turnstile waitq
632 * which will wakeup the highest priority waiter.
633 * A potential downside of this would be starving low
634 * priority senders if there is a constant churn of
635 * high priority threads trying to send to this port.
636 */
637 if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
638 IPC_MQUEUE_FULL,
639 THREAD_AWAKENED,
640 WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
641 port->ip_fullwaiters = false;
642 } else {
643 /* gave away our slot - add reference back */
644 port_mq->imq_msgcount++;
645 }
646 }
647
648 if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
649 waitq_clear_prepost_locked(&port->ip_waitq);
650 }
651 }
652
653 /*
654 * Routine: ipc_mqueue_post
655 * Purpose:
656 * Post a message to a waiting receiver or enqueue it. If a
657 * receiver is waiting, we can release our reserved space in
658 * the message queue.
659 *
660 * Conditions:
661 * port is unlocked
662 * If we need to queue, our space in the message queue is reserved.
663 */
664 static void
ipc_mqueue_post(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option64_t option __unused)665 ipc_mqueue_post(
666 ipc_mqueue_t mqueue,
667 ipc_kmsg_t kmsg,
668 mach_msg_option64_t option __unused)
669 {
670 ipc_port_t port = ip_from_mq(mqueue);
671 struct waitq *waitq = &port->ip_waitq;
672 boolean_t destroy_msg = FALSE;
673
674 ipc_kmsg_trace_send(kmsg, option);
675
676 /*
677 * While the msg queue is locked, we have control of the
678 * kmsg, so the ref in it for the port is still good.
679 *
680 * Check for a receiver for the message.
681 */
682 ip_mq_lock(port);
683
684 /* we may have raced with port destruction! */
685 if (!waitq_is_valid(&port->ip_waitq)) {
686 destroy_msg = TRUE;
687 goto out_unlock;
688 }
689
690 for (;;) {
691 mach_msg_size_t msize, tsize, asize;
692 thread_t receiver;
693
694 receiver = waitq_wakeup64_identify_locked(waitq,
695 IPC_MQUEUE_RECEIVE, THREAD_AWAKENED, WAITQ_KEEP_LOCKED);
696 /* waitq still locked, thread not runnable */
697
698 if (receiver == THREAD_NULL) {
699 /*
700 * no receivers; queue kmsg if space still reserved
701 * Reservations are cancelled when the port goes inactive.
702 * note that this will enqueue the message for any
703 * "peeking" receivers.
704 *
705 * Also, post the knote to wake up any threads waiting
706 * on that style of interface if this insertion is of
707 * note (first insertion, or adjusted override qos all
708 * the way to the head of the queue).
709 *
710 * This is just for ports. port-sets knotes are being
711 * posted to by the waitq_wakeup64_identify_locked()
712 * above already.
713 */
714 if (mqueue->imq_msgcount == 0) {
715 /*
716 * The message queue must belong
717 * to an inactive port, so just destroy
718 * the message and pretend it was posted.
719 */
720 destroy_msg = TRUE;
721 } else if (!ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
722 /*
723 * queue was not empty and qos
724 * didn't change, nothing to do.
725 */
726 } else if (ip_in_a_space(port) &&
727 is_active(ip_get_receiver(port)) &&
728 ipc_port_has_klist(port)) {
729 /*
730 * queue was empty or qos changed
731 * we need to tell kqueue, unless
732 * the space is getting torn down
733 */
734 KNOTE(&port->ip_klist, 0);
735 }
736 break;
737 }
738
739 #if MACH_FLIPC
740 /*
741 * If a thread is attempting a "peek" into the message queue
742 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
743 * thread running. A successful peek is essentially the same as
744 * message delivery since the peeking thread takes responsibility
745 * for delivering the message and (eventually) removing it from
746 * the mqueue. Only one thread can successfully use the peek
747 * facility on any given port, so we exit the waitq loop after
748 * encountering such a thread.
749 */
750 if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
751 ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
752 ipc_mqueue_peek_on_thread_locked(mqueue, receiver->ith_option, receiver);
753 waitq_resume_identified_thread(waitq, receiver,
754 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
755 break; /* Message was posted, so break out of loop */
756 }
757 #endif /* MACH_FLIPC */
758
759 /*
760 * If the receiver waited with a facility not directly related
761 * to Mach messaging, then it isn't prepared to get handed the
762 * message directly. Just set it running, and go look for
763 * another thread that can.
764 */
765 if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
766 waitq_resume_identified_thread(waitq, receiver,
767 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
768
769 continue;
770 }
771
772
773 /*
774 * We found a waiting thread.
775 * If the message is too large or the scatter list is too small
776 * the thread we wake up will get that as its status.
777 */
778 msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
779 tsize = ipc_kmsg_trailer_size(receiver->ith_option, receiver->map);
780 asize = kmsg->ikm_aux_size;
781
782 if (ipc_mqueue_msg_too_large(msize, tsize, asize, receiver->ith_option,
783 &receiver->ith_recv_bufs)) {
784 receiver->ith_msize = msize;
785 receiver->ith_asize = asize;
786 receiver->ith_state = MACH_RCV_TOO_LARGE;
787 } else {
788 receiver->ith_state = MACH_MSG_SUCCESS;
789 }
790
791 /*
792 * If there is no problem with the upcoming receive, or the
793 * receiver thread didn't specifically ask for special too
794 * large error condition, go ahead and select it anyway.
795 */
796 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
797 !(receiver->ith_option & MACH_RCV_LARGE)) {
798 receiver->ith_kmsg = kmsg;
799 receiver->ith_seqno = mqueue->imq_seqno++;
800 #if MACH_FLIPC
801 mach_node_t node = kmsg->ikm_node;
802 #endif
803 waitq_resume_identified_thread(waitq, receiver,
804 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
805
806 /* we didn't need our reserved spot in the queue */
807 ipc_mqueue_release_msgcount(mqueue);
808
809 #if MACH_FLIPC
810 if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
811 flipc_msg_ack(node, mqueue, TRUE);
812 }
813 #endif
814 break;
815 }
816
817 /*
818 * Otherwise, this thread needs to be released to run
819 * and handle its error without getting the message. We
820 * need to go back and pick another one.
821 */
822 receiver->ith_receiver_name = mqueue->imq_receiver_name;
823 receiver->ith_kmsg = IKM_NULL;
824 receiver->ith_seqno = 0;
825
826 waitq_resume_identified_thread(waitq, receiver,
827 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
828 }
829
830 out_unlock:
831 /* clear the waitq boost we may have been given */
832 waitq_clear_promotion_locked(waitq, current_thread());
833 waitq_unlock(waitq);
834
835 if (destroy_msg) {
836 ipc_kmsg_destroy(kmsg, IPC_KMSG_DESTROY_ALL);
837 }
838
839 counter_inc(¤t_task()->messages_sent);
840 return;
841 }
842
843
844 static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)845 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
846 {
847 thread_t self = current_thread();
848 mach_msg_option64_t option64 = self->ith_option;
849
850 /*
851 * why did we wake up?
852 */
853 switch (saved_wait_result) {
854 case THREAD_TIMED_OUT:
855 self->ith_state = MACH_RCV_TIMED_OUT;
856 return;
857
858 case THREAD_INTERRUPTED:
859 self->ith_state = MACH_RCV_INTERRUPTED;
860 return;
861
862 case THREAD_RESTART:
863 /* something bad happened to the port/set */
864 self->ith_state = MACH_RCV_PORT_CHANGED;
865 return;
866
867 case THREAD_AWAKENED:
868 /*
869 * We do not need to go select a message, somebody
870 * handed us one (or a too-large indication).
871 */
872 switch (self->ith_state) {
873 case MACH_RCV_SCATTER_SMALL:
874 case MACH_RCV_TOO_LARGE:
875 /*
876 * Somebody tried to give us a too large
877 * message. If we indicated that we cared,
878 * then they only gave us the indication,
879 * otherwise they gave us the indication
880 * AND the message anyway.
881 */
882 if (option64 & MACH_RCV_LARGE) {
883 return;
884 }
885 return;
886 case MACH_MSG_SUCCESS:
887 return;
888 #if MACH_FLIPC
889 case MACH_PEEK_READY:
890 return;
891 #endif /* MACH_FLIPC */
892
893 default:
894 panic("ipc_mqueue_receive_results: strange ith_state %d", self->ith_state);
895 }
896
897 default:
898 panic("ipc_mqueue_receive_results: strange wait_result %d", saved_wait_result);
899 }
900 }
901
902 void
ipc_mqueue_receive_continue(__unused void * param,wait_result_t wresult)903 ipc_mqueue_receive_continue(
904 __unused void *param,
905 wait_result_t wresult)
906 {
907 ipc_mqueue_receive_results(wresult);
908 mach_msg_receive_continue(); /* hard-coded for now */
909 }
910
911 /*
912 * Routine: ipc_mqueue_receive
913 * Purpose:
914 * Receive a message from a message queue.
915 *
916 * Conditions:
917 * Our caller must hold a reference for the port or port set
918 * to which this queue belongs, to keep the queue
919 * from being deallocated.
920 *
921 * The kmsg is returned with clean header fields
922 * and with the circular bit turned off through the ith_kmsg
923 * field of the thread's receive continuation state.
924 * Returns:
925 * MACH_MSG_SUCCESS Message returned in ith_kmsg.
926 * MACH_RCV_TOO_LARGE Message size returned in ith_msize,
927 * Auxiliary data size returned in ith_asize
928 * MACH_RCV_TIMED_OUT No message obtained.
929 * MACH_RCV_INTERRUPTED No message obtained.
930 * MACH_RCV_PORT_DIED Port/set died; no message.
931 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
932 *
933 */
934
935 void
ipc_mqueue_receive(struct waitq * waitq,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread,bool has_continuation)936 ipc_mqueue_receive(
937 struct waitq *waitq,
938 mach_msg_timeout_t rcv_timeout,
939 int interruptible,
940 thread_t thread,
941 bool has_continuation)
942 {
943 wait_result_t wresult;
944
945 assert(thread == current_thread());
946 waitq_lock(waitq);
947
948 wresult = ipc_mqueue_receive_on_thread_and_unlock(waitq, rcv_timeout,
949 interruptible, thread);
950 /* object unlocked */
951 if (wresult == THREAD_NOT_WAITING) {
952 return;
953 }
954
955 if (wresult == THREAD_WAITING) {
956 if (has_continuation) {
957 wresult = thread_block(ipc_mqueue_receive_continue);
958 /* NOTREACHED */
959 }
960 wresult = thread_block(THREAD_CONTINUE_NULL);
961 }
962 ipc_mqueue_receive_results(wresult);
963 }
964
965 /*
966 * Routine: ipc_mqueue_receive_on_thread_and_unlock
967 * Purpose:
968 * Receive a message from a message queue using a specified thread.
969 * If no message available, assert_wait on the appropriate waitq.
970 *
971 * Conditions:
972 * Assumes thread is self.
973 * The port/port-set waitq is locked on entry, unlocked on return.
974 * May have assert-waited. Caller must block in those cases.
975 */
976 wait_result_t
ipc_mqueue_receive_on_thread_and_unlock(struct waitq * waitq,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread)977 ipc_mqueue_receive_on_thread_and_unlock(
978 struct waitq *waitq,
979 mach_msg_timeout_t rcv_timeout,
980 int interruptible,
981 thread_t thread)
982 {
983 mach_msg_option64_t option64 = thread->ith_option;
984 ipc_object_t object = io_from_waitq(waitq);
985 ipc_port_t port = IP_NULL;
986 wait_result_t wresult;
987 uint64_t deadline;
988 struct turnstile *rcv_turnstile = TURNSTILE_NULL;
989
990 assert(thread == current_thread());
991
992 if (waitq_type(waitq) == WQT_PORT_SET) {
993 ipc_pset_t pset = ips_object_to_pset(object);
994 wqs_prepost_flags_t wqs_flags = WQS_PREPOST_LOCK;
995 struct waitq *port_wq;
996
997 /*
998 * Put the message at the back of the prepost list
999 * if it's not a PEEK.
1000 *
1001 * Might drop the pset lock temporarily.
1002 */
1003 #if MACH_FLIPC
1004 if (option64 & MACH64_PEEK_MSG) {
1005 wqs_flags |= WQS_PREPOST_PEEK;
1006 }
1007 #endif /* MACH_FLIPC */
1008 port_wq = waitq_set_first_prepost(&pset->ips_wqset, wqs_flags);
1009
1010 /* Returns with port locked */
1011
1012 if (port_wq != NULL) {
1013 /*
1014 * We get here if there is at least one message
1015 * waiting on port_wq. We have instructed the prepost
1016 * iteration logic to leave both the port_wq and the
1017 * set waitq locked.
1018 *
1019 * Continue on to handling the message with just
1020 * the port waitq locked.
1021 */
1022 io_unlock(object);
1023 port = ip_from_waitq(port_wq);
1024 }
1025 } else if (waitq_type(waitq) == WQT_PORT) {
1026 port = ip_from_waitq(waitq);
1027 if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
1028 port = IP_NULL;
1029 }
1030 } else {
1031 panic("Unknown waitq type (%p/0x%x)", waitq, waitq_type(waitq));
1032 }
1033
1034 if (port) {
1035 #if MACH_FLIPC
1036 if (option64 & MACH64_PEEK_MSG) {
1037 ipc_mqueue_peek_on_thread_locked(&port->ip_messages,
1038 option64, thread);
1039 } else
1040 #endif /* MACH_FLIPC */
1041 {
1042 ipc_mqueue_select_on_thread_locked(&port->ip_messages,
1043 option64, thread);
1044 }
1045 ip_mq_unlock(port);
1046 return THREAD_NOT_WAITING;
1047 }
1048
1049 if (!waitq_is_valid(waitq)) {
1050 /* someone raced us to destroy this mqueue/port! */
1051 io_unlock(object);
1052 /*
1053 * ipc_mqueue_receive_results updates the thread's ith_state
1054 * TODO: differentiate between rights being moved and
1055 * rights/ports being destroyed (21885327)
1056 */
1057 return THREAD_RESTART;
1058 }
1059
1060 /*
1061 * Looks like we'll have to block. The waitq we will
1062 * block on (whether the set's or the local port's) is
1063 * still locked.
1064 */
1065 if ((option64 & MACH_RCV_TIMEOUT) && rcv_timeout == 0) {
1066 io_unlock(object);
1067 thread->ith_state = MACH_RCV_TIMED_OUT;
1068 return THREAD_NOT_WAITING;
1069 }
1070
1071 thread->ith_state = MACH_RCV_IN_PROGRESS;
1072 #if MACH_FLIPC
1073 if (option64 & MACH64_PEEK_MSG) {
1074 thread->ith_state = MACH_PEEK_IN_PROGRESS;
1075 }
1076 #endif /* MACH_FLIPC */
1077
1078 if (option64 & MACH_RCV_TIMEOUT) {
1079 clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
1080 } else {
1081 deadline = 0;
1082 }
1083
1084 /*
1085 * Threads waiting on a reply port (not portset)
1086 * will wait on its receive turnstile.
1087 *
1088 * Donate waiting thread's turnstile and
1089 * setup inheritor for special reply port.
1090 * Based on the state of the special reply
1091 * port, the inheritor would be the send
1092 * turnstile of the connection port on which
1093 * the send of sync ipc would happen or
1094 * workloop's turnstile who would reply to
1095 * the sync ipc message.
1096 *
1097 * Pass in mqueue wait in waitq_assert_wait to
1098 * support port set wakeup. The mqueue waitq of port
1099 * will be converted to to turnstile waitq
1100 * in waitq_assert_wait instead of global waitqs.
1101 */
1102 if (waitq_type(waitq) == WQT_PORT) {
1103 port = ip_from_waitq(waitq);
1104 rcv_turnstile = turnstile_prepare((uintptr_t)port,
1105 port_rcv_turnstile_address(port),
1106 TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1107
1108 ipc_port_recv_update_inheritor(port, rcv_turnstile,
1109 TURNSTILE_DELAYED_UPDATE);
1110 }
1111
1112 thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1113 wresult = waitq_assert_wait64_locked(waitq,
1114 IPC_MQUEUE_RECEIVE,
1115 interruptible,
1116 TIMEOUT_URGENCY_USER_NORMAL,
1117 deadline,
1118 TIMEOUT_NO_LEEWAY,
1119 thread);
1120 if (wresult == THREAD_AWAKENED) {
1121 /*
1122 * The first thing we did was to look for preposts
1123 * (using waitq_set_first_prepost() for sets, or looking
1124 * at the port's queue for ports).
1125 *
1126 * Since we found none, we kept the waitq locked.
1127 *
1128 * It ensures that waitq_assert_wait64_locked() can't
1129 * find pre-posts either, won't drop the waitq lock
1130 * either (even for a set), and can't return THREAD_AWAKENED.
1131 */
1132 panic("ipc_mqueue_receive_on_thread: sleep walking");
1133 }
1134
1135 io_unlock(object);
1136
1137 /*
1138 * After this point, a waiting thread could be found by the wakeup
1139 * identify path, and the other side now owns the ith_ fields until
1140 * this thread blocks and resumes in the continuation
1141 */
1142
1143 /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1144 if (rcv_turnstile != TURNSTILE_NULL) {
1145 turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1146 }
1147 /* Its callers responsibility to call turnstile_complete to get the turnstile back */
1148
1149 return wresult;
1150 }
1151
1152 #if MACH_FLIPC
1153 /*
1154 * Routine: ipc_mqueue_peek_on_thread_locked
1155 * Purpose:
1156 * A receiver discovered that there was a message on the queue
1157 * before he had to block. Tell a thread about the message queue,
1158 * but don't pick off any messages.
1159 * Conditions:
1160 * port_mq locked
1161 * at least one message on port_mq's message queue
1162 *
1163 * Returns: (on thread->ith_state)
1164 * MACH_PEEK_READY ith_peekq contains a message queue
1165 */
1166 void
ipc_mqueue_peek_on_thread_locked(ipc_mqueue_t port_mq,__assert_only mach_msg_option64_t option64,thread_t thread)1167 ipc_mqueue_peek_on_thread_locked(
1168 ipc_mqueue_t port_mq,
1169 __assert_only mach_msg_option64_t option64,
1170 thread_t thread)
1171 {
1172 assert(option64 & MACH64_PEEK_MSG);
1173 assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1174
1175 /*
1176 * Take a reference on the mqueue's associated port:
1177 * the peeking thread will be responsible to release this reference
1178 */
1179 ip_validate(ip_from_mq(port_mq));
1180 ip_reference(ip_from_mq(port_mq));
1181 thread->ith_peekq = port_mq;
1182 thread->ith_state = MACH_PEEK_READY;
1183 }
1184 #endif /* MACH_FLIPC */
1185
1186 /*
1187 * Routine: ipc_mqueue_select_on_thread_locked
1188 * Purpose:
1189 * A receiver discovered that there was a message on the queue
1190 * before he had to block. Pick the message off the queue and
1191 * "post" it to thread.
1192 * Conditions:
1193 * port locked.
1194 * thread not locked.
1195 * There is a message.
1196 * No need to reserve prepost objects - it will never prepost
1197 *
1198 * Returns:
1199 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
1200 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
1201 */
1202 void
ipc_mqueue_select_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option64_t options,thread_t thread)1203 ipc_mqueue_select_on_thread_locked(
1204 ipc_mqueue_t port_mq,
1205 mach_msg_option64_t options,
1206 thread_t thread)
1207 {
1208 mach_msg_size_t msize, tsize, asize;
1209 ipc_kmsg_t kmsg;
1210
1211 mach_msg_return_t mr = MACH_MSG_SUCCESS;
1212
1213 /*
1214 * Do some sanity checking of our ability to receive
1215 * before pulling the message off the queue.
1216 */
1217 kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1218 assert(kmsg != IKM_NULL);
1219
1220 /*
1221 * If we really can't receive it, but we had the
1222 * MACH_RCV_LARGE option set, then don't take it off
1223 * the queue, instead return the appropriate error
1224 * (and size needed).
1225 */
1226 msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1227 tsize = ipc_kmsg_trailer_size(options, thread->map);
1228 asize = kmsg->ikm_aux_size;
1229
1230 if (ipc_mqueue_msg_too_large(msize, tsize, asize, options,
1231 &thread->ith_recv_bufs)) {
1232 mr = MACH_RCV_TOO_LARGE;
1233 if (options & MACH_RCV_LARGE) {
1234 (void)ipc_kmsg_validate_signature(kmsg);
1235 thread->ith_receiver_name = port_mq->imq_receiver_name;
1236 thread->ith_kmsg = IKM_NULL;
1237 thread->ith_msize = msize;
1238 thread->ith_asize = asize;
1239 thread->ith_seqno = 0;
1240 thread->ith_state = mr;
1241 return;
1242 }
1243 }
1244
1245 ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1246 #if MACH_FLIPC
1247 if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1248 flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1249 }
1250 #endif
1251 ipc_mqueue_release_msgcount(port_mq);
1252 thread->ith_seqno = port_mq->imq_seqno++;
1253 thread->ith_kmsg = kmsg;
1254 thread->ith_state = mr;
1255
1256 counter_inc(¤t_task()->messages_received);
1257 return;
1258 }
1259
1260 /*
1261 * Routine: ipc_mqueue_peek_locked
1262 * Purpose:
1263 * Peek at a (non-set) message queue to see if it has a message
1264 * matching the sequence number provided (if zero, then the
1265 * first message in the queue) and return vital info about the
1266 * message.
1267 *
1268 * Conditions:
1269 * The io object corresponding to mq is locked by callers.
1270 * Other locks may be held by callers, so this routine cannot block.
1271 * Caller holds reference on the message queue.
1272 */
1273 unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1274 ipc_mqueue_peek_locked(
1275 ipc_mqueue_t mq,
1276 mach_port_seqno_t *seqnop,
1277 mach_msg_size_t *msg_sizep,
1278 mach_msg_id_t *msg_idp,
1279 mach_msg_max_trailer_t *msg_trailerp,
1280 ipc_kmsg_t *kmsgp)
1281 {
1282 ipc_kmsg_queue_t kmsgq;
1283 ipc_kmsg_t kmsg;
1284 mach_port_seqno_t seqno, msgoff;
1285 unsigned res = 0;
1286 mach_msg_header_t *hdr;
1287
1288 seqno = 0;
1289 if (seqnop != NULL) {
1290 seqno = *seqnop;
1291 }
1292
1293 if (seqno == 0) {
1294 seqno = mq->imq_seqno;
1295 msgoff = 0;
1296 } else if (seqno >= mq->imq_seqno &&
1297 seqno < mq->imq_seqno + mq->imq_msgcount) {
1298 msgoff = seqno - mq->imq_seqno;
1299 } else {
1300 goto out;
1301 }
1302
1303 /* look for the message that would match that seqno */
1304 kmsgq = &mq->imq_messages;
1305 kmsg = ipc_kmsg_queue_first(kmsgq);
1306 while (msgoff-- && kmsg != IKM_NULL) {
1307 kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1308 }
1309 if (kmsg == IKM_NULL) {
1310 goto out;
1311 }
1312
1313 /*
1314 * Validate kmsg signature before doing anything with it. Since we are holding
1315 * the mqueue lock here, and only header + trailer will be peeked on, just
1316 * do a partial validation to finish quickly.
1317 *
1318 * Partial kmsg signature is only supported on PAC devices.
1319 */
1320 (void)ipc_kmsg_validate_signature(kmsg);
1321
1322 hdr = ikm_header(kmsg);
1323 /* found one - return the requested info */
1324 if (seqnop != NULL) {
1325 *seqnop = seqno;
1326 }
1327 if (msg_sizep != NULL) {
1328 *msg_sizep = hdr->msgh_size;
1329 }
1330 if (msg_idp != NULL) {
1331 *msg_idp = hdr->msgh_id;
1332 }
1333 if (msg_trailerp != NULL) {
1334 *msg_trailerp = *ipc_kmsg_get_trailer(kmsg);
1335 }
1336 if (kmsgp != NULL) {
1337 *kmsgp = kmsg;
1338 }
1339
1340 res = 1;
1341
1342 out:
1343 return res;
1344 }
1345
1346
1347 /*
1348 * Routine: ipc_mqueue_peek
1349 * Purpose:
1350 * Peek at a (non-set) message queue to see if it has a message
1351 * matching the sequence number provided (if zero, then the
1352 * first message in the queue) and return vital info about the
1353 * message.
1354 *
1355 * Conditions:
1356 * The ipc_mqueue_t is unlocked.
1357 * Locks may be held by callers, so this routine cannot block.
1358 * Caller holds reference on the message queue.
1359 */
1360 unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1361 ipc_mqueue_peek(ipc_mqueue_t mq,
1362 mach_port_seqno_t * seqnop,
1363 mach_msg_size_t * msg_sizep,
1364 mach_msg_id_t * msg_idp,
1365 mach_msg_max_trailer_t * msg_trailerp,
1366 ipc_kmsg_t *kmsgp)
1367 {
1368 ipc_port_t port = ip_from_mq(mq);
1369 unsigned res;
1370
1371 ip_mq_lock(port);
1372
1373 res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1374 msg_trailerp, kmsgp);
1375
1376 ip_mq_unlock(port);
1377 return res;
1378 }
1379
1380 #if MACH_FLIPC
1381 /*
1382 * Routine: ipc_mqueue_release_peek_ref
1383 * Purpose:
1384 * Release the reference on an mqueue's associated port which was
1385 * granted to a thread in ipc_mqueue_peek_on_thread (on the
1386 * MACH64_PEEK_MSG thread wakeup path).
1387 *
1388 * Conditions:
1389 * The ipc_mqueue_t should be locked on entry.
1390 * The ipc_mqueue_t will be _unlocked_ on return
1391 * (and potentially invalid!)
1392 *
1393 */
1394 void
ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)1395 ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)
1396 {
1397 ipc_port_t port = ip_from_mq(mqueue);
1398
1399 ip_mq_lock_held(port);
1400
1401 /*
1402 * clear any preposts this mq may have generated
1403 * (which would cause subsequent immediate wakeups)
1404 */
1405 waitq_clear_prepost_locked(&port->ip_waitq);
1406
1407 ip_mq_unlock(port);
1408
1409 /*
1410 * release the port reference: we need to do this outside the lock
1411 * because we might be holding the last port reference!
1412 **/
1413 ip_release(port);
1414 }
1415 #endif /* MACH_FLIPC */
1416
1417 /*
1418 * Routine: ipc_mqueue_destroy_locked
1419 * Purpose:
1420 * Destroy a message queue.
1421 * Set any blocked senders running.
1422 * Destroy the kmsgs in the queue.
1423 * Conditions:
1424 * port locked
1425 * Receivers were removed when the receive right was "changed"
1426 */
1427 boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue,waitq_link_list_t * free_l)1428 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue, waitq_link_list_t *free_l)
1429 {
1430 ipc_port_t port = ip_from_mq(mqueue);
1431 boolean_t reap = FALSE;
1432 struct turnstile *send_turnstile = port_send_turnstile(port);
1433
1434 /*
1435 * rouse all blocked senders
1436 * (don't boost anyone - we're tearing this queue down)
1437 * (never preposts)
1438 */
1439 port->ip_fullwaiters = false;
1440
1441 if (send_turnstile != TURNSTILE_NULL) {
1442 waitq_wakeup64_all(&send_turnstile->ts_waitq,
1443 IPC_MQUEUE_FULL,
1444 THREAD_RESTART, WAITQ_WAKEUP_DEFAULT);
1445 }
1446
1447 #if MACH_FLIPC
1448 ipc_kmsg_t kmsg;
1449
1450 cqe_foreach_element_safe(kmsg, &mqueue->imq_messages, ikm_link) {
1451 if (MACH_NODE_VALID(kmsg->ikm_node) &&
1452 FPORT_VALID(mqueue->imq_fport)) {
1453 flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1454 }
1455 }
1456 #endif
1457
1458 /*
1459 * Move messages from the specified queue to the per-thread
1460 * clean/drain queue while we have the mqueue lock.
1461 */
1462 reap = ipc_kmsg_delayed_destroy_queue(&mqueue->imq_messages);
1463
1464 /*
1465 * Wipe out message count, both for messages about to be
1466 * reaped and for reserved space for (previously) woken senders.
1467 * This is the indication to them that their reserved space is gone
1468 * (the mqueue was destroyed).
1469 */
1470 mqueue->imq_msgcount = 0;
1471
1472 /*
1473 * invalidate the waitq for subsequent mqueue operations,
1474 * the port lock could be dropped after invalidating the mqueue.
1475 */
1476
1477 waitq_invalidate(&port->ip_waitq);
1478
1479 waitq_unlink_all_locked(&port->ip_waitq, NULL, free_l);
1480
1481 return reap;
1482 }
1483
1484 /*
1485 * Routine: ipc_mqueue_set_qlimit_locked
1486 * Purpose:
1487 * Changes a message queue limit; the maximum number
1488 * of messages which may be queued.
1489 * Conditions:
1490 * Port locked.
1491 */
1492
1493 void
ipc_mqueue_set_qlimit_locked(ipc_mqueue_t mqueue,mach_port_msgcount_t qlimit)1494 ipc_mqueue_set_qlimit_locked(
1495 ipc_mqueue_t mqueue,
1496 mach_port_msgcount_t qlimit)
1497 {
1498 ipc_port_t port = ip_from_mq(mqueue);
1499
1500 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1501
1502 /* wake up senders allowed by the new qlimit */
1503 if (qlimit > mqueue->imq_qlimit) {
1504 mach_port_msgcount_t i, wakeup;
1505 struct turnstile *send_turnstile = port_send_turnstile(port);
1506
1507 /* caution: wakeup, qlimit are unsigned */
1508 wakeup = qlimit - mqueue->imq_qlimit;
1509
1510 for (i = 0; i < wakeup; i++) {
1511 /*
1512 * boost the priority of the awoken thread
1513 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1514 * the message queue slot we've just reserved.
1515 *
1516 * NOTE: this will never prepost
1517 */
1518 if (send_turnstile == TURNSTILE_NULL ||
1519 waitq_wakeup64_one(&send_turnstile->ts_waitq,
1520 IPC_MQUEUE_FULL,
1521 THREAD_AWAKENED,
1522 WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1523 port->ip_fullwaiters = false;
1524 break;
1525 }
1526 mqueue->imq_msgcount++; /* give it to the awakened thread */
1527 }
1528 }
1529 mqueue->imq_qlimit = (uint16_t)qlimit;
1530 }
1531
1532 /*
1533 * Routine: ipc_mqueue_set_seqno_locked
1534 * Purpose:
1535 * Changes an mqueue's sequence number.
1536 * Conditions:
1537 * Caller holds a reference to the queue's containing object.
1538 */
1539 void
ipc_mqueue_set_seqno_locked(ipc_mqueue_t mqueue,mach_port_seqno_t seqno)1540 ipc_mqueue_set_seqno_locked(
1541 ipc_mqueue_t mqueue,
1542 mach_port_seqno_t seqno)
1543 {
1544 mqueue->imq_seqno = seqno;
1545 }
1546
1547
1548 /*
1549 * Routine: ipc_mqueue_copyin
1550 * Purpose:
1551 * Convert a name in a space to a message queue.
1552 * Conditions:
1553 * Nothing locked. If successful, the caller gets a ref for
1554 * for the object. This ref ensures the continued existence of
1555 * the queue.
1556 * Returns:
1557 * MACH_MSG_SUCCESS Found a message queue.
1558 * MACH_RCV_INVALID_NAME The space is dead.
1559 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
1560 * MACH_RCV_INVALID_NAME
1561 * The denoted right is not receive or port set.
1562 * MACH_RCV_IN_SET Receive right is a member of a set.
1563 */
1564
1565 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_object_t * objectp)1566 ipc_mqueue_copyin(
1567 ipc_space_t space,
1568 mach_port_name_t name,
1569 ipc_object_t *objectp)
1570 {
1571 ipc_entry_bits_t bits;
1572 ipc_object_t object;
1573 kern_return_t kr;
1574
1575 kr = ipc_right_lookup_read(space, name, &bits, &object);
1576 if (kr != KERN_SUCCESS) {
1577 return MACH_RCV_INVALID_NAME;
1578 }
1579 /* object is locked and active */
1580
1581 if (bits & MACH_PORT_TYPE_RECEIVE) {
1582 __assert_only ipc_port_t port = ip_object_to_port(object);
1583 assert(ip_get_receiver_name(port) == name);
1584 assert(ip_in_space(port, space));
1585 }
1586 if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1587 io_reference(object);
1588 io_unlock(object);
1589 } else {
1590 io_unlock(object);
1591 /* guard exception if we never held the receive right in this entry */
1592 if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
1593 mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME);
1594 }
1595 return MACH_RCV_INVALID_NAME;
1596 }
1597
1598 *objectp = object;
1599 return MACH_MSG_SUCCESS;
1600 }
1601