1 typedef struct _IODataQueueEntry {
2 uint32_t size;
3 uint8_t data[0];
4 } IODataQueueEntry;
5
6 #define DATA_QUEUE_ENTRY_HEADER_SIZE sizeof(IODataQueueEntry)
7
8 typedef struct _IODataQueueMemory {
9 volatile uint32_t head;
10 volatile uint32_t tail;
11 volatile uint8_t needServicedCallback;
12 volatile uint8_t _resv[119];
13 IODataQueueEntry queue[0];
14 } IODataQueueMemory;
15
16 struct IODataQueueDispatchSource_IVars {
17 IODataQueueMemory * dataQueue;
18 IODataQueueDispatchSource * source;
19 // IODispatchQueue * queue;
20 IOMemoryDescriptor * memory;
21 OSAction * dataAvailableAction;
22 OSAction * dataServicedAction;
23 uint64_t options;
24 uint32_t queueByteCount;
25
26 #if !KERNEL
27 bool enable;
28 bool canceled;
29 #endif
30 };
31
32 bool
init()33 IODataQueueDispatchSource::init()
34 {
35 if (!super::init()) {
36 return false;
37 }
38
39 ivars = IONewZero(IODataQueueDispatchSource_IVars, 1);
40 ivars->source = this;
41
42 #if !KERNEL
43 kern_return_t ret;
44
45 ret = CopyMemory(&ivars->memory);
46 assert(kIOReturnSuccess == ret);
47
48 uint64_t address;
49 uint64_t length;
50
51 ret = ivars->memory->Map(0, 0, 0, 0, &address, &length);
52 assert(kIOReturnSuccess == ret);
53 ivars->dataQueue = (typeof(ivars->dataQueue))(uintptr_t) address;
54 ivars->queueByteCount = length;
55 #endif
56
57 return true;
58 }
59
60 kern_return_t
CheckForWork_Impl(const IORPC rpc,bool synchronous)61 IODataQueueDispatchSource::CheckForWork_Impl(
62 const IORPC rpc,
63 bool synchronous)
64 {
65 IOReturn ret = kIOReturnNotReady;
66
67 return ret;
68 }
69
70 #if KERNEL
71
72 kern_return_t
Create_Impl(uint64_t queueByteCount,IODispatchQueue * queue,IODataQueueDispatchSource ** source)73 IODataQueueDispatchSource::Create_Impl(
74 uint64_t queueByteCount,
75 IODispatchQueue * queue,
76 IODataQueueDispatchSource ** source)
77 {
78 IODataQueueDispatchSource * inst;
79 IOBufferMemoryDescriptor * bmd;
80
81 if (3 & queueByteCount) {
82 return kIOReturnBadArgument;
83 }
84 if (queueByteCount > UINT_MAX) {
85 return kIOReturnBadArgument;
86 }
87 inst = OSTypeAlloc(IODataQueueDispatchSource);
88 if (!inst) {
89 return kIOReturnNoMemory;
90 }
91 if (!inst->init()) {
92 inst->release();
93 return kIOReturnError;
94 }
95
96 bmd = IOBufferMemoryDescriptor::withOptions(
97 kIODirectionOutIn | kIOMemoryKernelUserShared,
98 queueByteCount, page_size);
99 if (!bmd) {
100 inst->release();
101 return kIOReturnNoMemory;
102 }
103 inst->ivars->memory = bmd;
104 inst->ivars->queueByteCount = ((uint32_t) queueByteCount);
105 inst->ivars->options = 0;
106 inst->ivars->dataQueue = (typeof(inst->ivars->dataQueue))bmd->getBytesNoCopy();
107
108 *source = inst;
109
110 return kIOReturnSuccess;
111 }
112
113 kern_return_t
CopyMemory_Impl(IOMemoryDescriptor ** memory)114 IODataQueueDispatchSource::CopyMemory_Impl(
115 IOMemoryDescriptor ** memory)
116 {
117 kern_return_t ret;
118 IOMemoryDescriptor * result;
119
120 result = ivars->memory;
121 if (result) {
122 result->retain();
123 ret = kIOReturnSuccess;
124 } else {
125 ret = kIOReturnNotReady;
126 }
127 *memory = result;
128
129 return ret;
130 }
131
132 kern_return_t
CopyDataAvailableHandler_Impl(OSAction ** action)133 IODataQueueDispatchSource::CopyDataAvailableHandler_Impl(
134 OSAction ** action)
135 {
136 kern_return_t ret;
137 OSAction * result;
138
139 result = ivars->dataAvailableAction;
140 if (result) {
141 result->retain();
142 ret = kIOReturnSuccess;
143 } else {
144 ret = kIOReturnNotReady;
145 }
146 *action = result;
147
148 return ret;
149 }
150
151 kern_return_t
CopyDataServicedHandler_Impl(OSAction ** action)152 IODataQueueDispatchSource::CopyDataServicedHandler_Impl(
153 OSAction ** action)
154 {
155 kern_return_t ret;
156 OSAction * result;
157
158 result = ivars->dataServicedAction;
159 if (result) {
160 result->retain();
161 ret = kIOReturnSuccess;
162 } else {
163 ret = kIOReturnNotReady;
164 }
165 *action = result;
166 return ret;
167 }
168
169 kern_return_t
SetDataAvailableHandler_Impl(OSAction * action)170 IODataQueueDispatchSource::SetDataAvailableHandler_Impl(
171 OSAction * action)
172 {
173 IOReturn ret;
174 OSAction * oldAction;
175
176 oldAction = ivars->dataAvailableAction;
177 if (oldAction && OSCompareAndSwapPtr(oldAction, NULL, &ivars->dataAvailableAction)) {
178 oldAction->release();
179 }
180 if (action) {
181 action->retain();
182 ivars->dataAvailableAction = action;
183 if (IsDataAvailable()) {
184 DataAvailable(ivars->dataAvailableAction);
185 }
186 }
187 ret = kIOReturnSuccess;
188
189 return ret;
190 }
191
192 kern_return_t
SetDataServicedHandler_Impl(OSAction * action)193 IODataQueueDispatchSource::SetDataServicedHandler_Impl(
194 OSAction * action)
195 {
196 IOReturn ret;
197 OSAction * oldAction;
198
199 oldAction = ivars->dataServicedAction;
200 if (oldAction && OSCompareAndSwapPtr(oldAction, NULL, &ivars->dataServicedAction)) {
201 oldAction->release();
202 }
203 if (action) {
204 action->retain();
205 ivars->dataServicedAction = action;
206 }
207 ret = kIOReturnSuccess;
208
209 return ret;
210 }
211
212 #endif /* KERNEL */
213
214 void
SendDataAvailable(void)215 IODataQueueDispatchSource::SendDataAvailable(void)
216 {
217 IOReturn ret;
218
219 if (!ivars->dataAvailableAction) {
220 ret = CopyDataAvailableHandler(&ivars->dataAvailableAction);
221 if (kIOReturnSuccess != ret) {
222 ivars->dataAvailableAction = NULL;
223 }
224 }
225 if (ivars->dataAvailableAction) {
226 DataAvailable(ivars->dataAvailableAction);
227 }
228 }
229
230 void
SendDataServiced(void)231 IODataQueueDispatchSource::SendDataServiced(void)
232 {
233 IOReturn ret;
234
235 if (!ivars->dataServicedAction) {
236 ret = CopyDataServicedHandler(&ivars->dataServicedAction);
237 if (kIOReturnSuccess != ret) {
238 ivars->dataServicedAction = NULL;
239 }
240 }
241 if (ivars->dataServicedAction) {
242 ivars->dataQueue->needServicedCallback = false;
243 DataServiced(ivars->dataServicedAction);
244 }
245 }
246
247 kern_return_t
SetEnableWithCompletion_Impl(bool enable,IODispatchSourceCancelHandler handler)248 IODataQueueDispatchSource::SetEnableWithCompletion_Impl(
249 bool enable,
250 IODispatchSourceCancelHandler handler)
251 {
252 IOReturn ret;
253
254 #if !KERNEL
255 ivars->enable = enable;
256 #endif
257
258 ret = kIOReturnSuccess;
259 return ret;
260 }
261
262 void
free()263 IODataQueueDispatchSource::free()
264 {
265 OSSafeReleaseNULL(ivars->memory);
266 OSSafeReleaseNULL(ivars->dataAvailableAction);
267 OSSafeReleaseNULL(ivars->dataServicedAction);
268 IOSafeDeleteNULL(ivars, IODataQueueDispatchSource_IVars, 1);
269 super::free();
270 }
271
272 kern_return_t
Cancel_Impl(IODispatchSourceCancelHandler handler)273 IODataQueueDispatchSource::Cancel_Impl(
274 IODispatchSourceCancelHandler handler)
275 {
276 #if !KERNEL
277 if (handler) {
278 handler();
279 }
280 #endif
281 return kIOReturnSuccess;
282 }
283
284 bool
IsDataAvailable(void)285 IODataQueueDispatchSource::IsDataAvailable(void)
286 {
287 IODataQueueMemory *dataQueue = ivars->dataQueue;
288
289 return dataQueue && (dataQueue->head != dataQueue->tail);
290 }
291
292 kern_return_t
Peek(IODataQueueClientDequeueEntryBlock callback)293 IODataQueueDispatchSource::Peek(IODataQueueClientDequeueEntryBlock callback)
294 {
295 IODataQueueEntry * entry = NULL;
296 IODataQueueMemory * dataQueue;
297 uint32_t callerDataSize;
298 uint32_t dataSize;
299 uint32_t headOffset;
300 uint32_t tailOffset;
301
302 dataQueue = ivars->dataQueue;
303 if (!dataQueue) {
304 return kIOReturnNoMemory;
305 }
306
307 // Read head and tail with acquire barrier
308 headOffset = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->head, __ATOMIC_RELAXED);
309 tailOffset = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->tail, __ATOMIC_ACQUIRE);
310
311 if (headOffset != tailOffset) {
312 IODataQueueEntry * head = NULL;
313 uint32_t headSize = 0;
314 uint32_t queueSize = ivars->queueByteCount;
315
316 if (headOffset > queueSize) {
317 return kIOReturnError;
318 }
319
320 head = (IODataQueueEntry *)((uintptr_t)dataQueue->queue + headOffset);
321 callerDataSize = head->size;
322 if (os_add_overflow(3, callerDataSize, &headSize)) {
323 return kIOReturnError;
324 }
325 headSize &= ~3U;
326
327 // Check if there's enough room before the end of the queue for a header.
328 // If there is room, check if there's enough room to hold the header and
329 // the data.
330
331 if ((headOffset > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
332 (headOffset + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize) ||
333 (headOffset + DATA_QUEUE_ENTRY_HEADER_SIZE > UINT32_MAX - headSize) ||
334 (headOffset + headSize + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize)) {
335 // No room for the header or the data, wrap to the beginning of the queue.
336 // Note: wrapping even with the UINT32_MAX checks, as we have to support
337 // queueSize of UINT32_MAX
338 entry = dataQueue->queue;
339 callerDataSize = entry->size;
340 dataSize = entry->size;
341 if (os_add_overflow(3, callerDataSize, &dataSize)) {
342 return kIOReturnError;
343 }
344 dataSize &= ~3U;
345
346 if ((dataSize > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
347 (dataSize + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize)) {
348 return kIOReturnError;
349 }
350
351 callback(&entry->data, callerDataSize);
352 return kIOReturnSuccess;
353 } else {
354 callback(&head->data, callerDataSize);
355 return kIOReturnSuccess;
356 }
357 }
358
359 return kIOReturnUnderrun;
360 }
361
362 kern_return_t
Dequeue(IODataQueueClientDequeueEntryBlock callback)363 IODataQueueDispatchSource::Dequeue(IODataQueueClientDequeueEntryBlock callback)
364 {
365 kern_return_t ret;
366 bool sendDataServiced;
367
368 sendDataServiced = false;
369 ret = DequeueWithCoalesce(&sendDataServiced, callback);
370 if (sendDataServiced) {
371 SendDataServiced();
372 }
373 return ret;
374 }
375
376 kern_return_t
DequeueWithCoalesce(bool * sendDataServiced,IODataQueueClientDequeueEntryBlock callback)377 IODataQueueDispatchSource::DequeueWithCoalesce(bool * sendDataServiced,
378 IODataQueueClientDequeueEntryBlock callback)
379 {
380 IOReturn retVal = kIOReturnSuccess;
381 IODataQueueEntry * entry = NULL;
382 IODataQueueMemory * dataQueue;
383 uint32_t callerDataSize;
384 uint32_t dataSize = 0;
385 uint32_t headOffset = 0;
386 uint32_t tailOffset = 0;
387 uint32_t newHeadOffset = 0;
388
389 dataQueue = ivars->dataQueue;
390 if (!dataQueue) {
391 return kIOReturnNoMemory;
392 }
393
394 // Read head and tail with acquire barrier
395 headOffset = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->head, __ATOMIC_RELAXED);
396 tailOffset = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->tail, __ATOMIC_ACQUIRE);
397
398 if (headOffset != tailOffset) {
399 IODataQueueEntry * head = NULL;
400 uint32_t headSize = 0;
401 uint32_t queueSize = ivars->queueByteCount;
402
403 if (headOffset > queueSize) {
404 return kIOReturnError;
405 }
406
407 head = (IODataQueueEntry *)((uintptr_t)dataQueue->queue + headOffset);
408 callerDataSize = head->size;
409 if (os_add_overflow(3, callerDataSize, &headSize)) {
410 return kIOReturnError;
411 }
412 headSize &= ~3U;
413
414 // we wrapped around to beginning, so read from there
415 // either there was not even room for the header
416 if ((headOffset > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
417 (headOffset + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize) ||
418 // or there was room for the header, but not for the data
419 (headOffset + DATA_QUEUE_ENTRY_HEADER_SIZE > UINT32_MAX - headSize) ||
420 (headOffset + headSize + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize)) {
421 // Note: we have to wrap to the beginning even with the UINT32_MAX checks
422 // because we have to support a queueSize of UINT32_MAX.
423 entry = dataQueue->queue;
424 callerDataSize = entry->size;
425
426 if (os_add_overflow(callerDataSize, 3, &dataSize)) {
427 return kIOReturnError;
428 }
429 dataSize &= ~3U;
430 if ((dataSize > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
431 (dataSize + DATA_QUEUE_ENTRY_HEADER_SIZE > queueSize)) {
432 return kIOReturnError;
433 }
434 newHeadOffset = dataSize + DATA_QUEUE_ENTRY_HEADER_SIZE;
435 // else it is at the end
436 } else {
437 entry = head;
438
439 if ((headSize > UINT32_MAX - DATA_QUEUE_ENTRY_HEADER_SIZE) ||
440 (headSize + DATA_QUEUE_ENTRY_HEADER_SIZE > UINT32_MAX - headOffset) ||
441 (headSize + DATA_QUEUE_ENTRY_HEADER_SIZE + headOffset > queueSize)) {
442 return kIOReturnError;
443 }
444 newHeadOffset = headOffset + headSize + DATA_QUEUE_ENTRY_HEADER_SIZE;
445 }
446 } else {
447 // empty queue
448 if (dataQueue->needServicedCallback) {
449 *sendDataServiced = true;
450 }
451 return kIOReturnUnderrun;
452 }
453
454 callback(&entry->data, callerDataSize);
455 if (dataQueue->needServicedCallback) {
456 *sendDataServiced = true;
457 }
458
459 __c11_atomic_store((_Atomic uint32_t *)&dataQueue->head, newHeadOffset, __ATOMIC_RELEASE);
460
461 if (newHeadOffset == tailOffset) {
462 //
463 // If we are making the queue empty, then we need to make sure
464 // that either the enqueuer notices, or we notice the enqueue
465 // that raced with our making of the queue empty.
466 //
467 __c11_atomic_thread_fence(__ATOMIC_SEQ_CST);
468 }
469
470 return retVal;
471 }
472
473 kern_return_t
Enqueue(uint32_t callerDataSize,IODataQueueClientEnqueueEntryBlock callback)474 IODataQueueDispatchSource::Enqueue(uint32_t callerDataSize,
475 IODataQueueClientEnqueueEntryBlock callback)
476 {
477 kern_return_t ret;
478 bool sendDataAvailable;
479
480 sendDataAvailable = false;
481 ret = EnqueueWithCoalesce(callerDataSize, &sendDataAvailable, callback);
482 if (sendDataAvailable) {
483 SendDataAvailable();
484 }
485 return ret;
486 }
487
488 kern_return_t
EnqueueWithCoalesce(uint32_t callerDataSize,bool * sendDataAvailable,IODataQueueClientEnqueueEntryBlock callback)489 IODataQueueDispatchSource::EnqueueWithCoalesce(uint32_t callerDataSize,
490 bool * sendDataAvailable,
491 IODataQueueClientEnqueueEntryBlock callback)
492 {
493 IODataQueueMemory * dataQueue;
494 IODataQueueEntry * entry;
495 uint32_t head;
496 uint32_t tail;
497 uint32_t newTail;
498 uint32_t dataSize;
499 uint32_t queueSize;
500 uint32_t entrySize;
501 IOReturn retVal = kIOReturnSuccess;
502
503 dataQueue = ivars->dataQueue;
504 if (!dataQueue) {
505 return kIOReturnNoMemory;
506 }
507 queueSize = ivars->queueByteCount;
508
509 // Force a single read of head and tail
510 tail = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->tail, __ATOMIC_RELAXED);
511 head = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->head, __ATOMIC_ACQUIRE);
512
513 if (os_add_overflow(callerDataSize, 3, &dataSize)) {
514 return kIOReturnOverrun;
515 }
516 dataSize &= ~3U;
517
518 // Check for overflow of entrySize
519 if (os_add_overflow(DATA_QUEUE_ENTRY_HEADER_SIZE, dataSize, &entrySize)) {
520 return kIOReturnOverrun;
521 }
522
523 // Check for underflow of (getQueueSize() - tail)
524 if (queueSize < tail || queueSize < head) {
525 return kIOReturnUnderrun;
526 }
527
528 newTail = tail;
529 if (tail >= head) {
530 // Is there enough room at the end for the entry?
531 if ((entrySize <= (UINT32_MAX - tail)) &&
532 ((tail + entrySize) <= queueSize)) {
533 entry = (IODataQueueEntry *)((uintptr_t)dataQueue->queue + tail);
534
535 callback(&entry->data, callerDataSize);
536
537 entry->size = callerDataSize;
538
539 // The tail can be out of bound when the size of the new entry
540 // exactly matches the available space at the end of the queue.
541 // The tail can range from 0 to queueSize inclusive.
542
543 newTail = tail + entrySize;
544 } else if (head > entrySize) { // Is there enough room at the beginning?
545 entry = (IODataQueueEntry *)((uintptr_t)dataQueue->queue);
546
547 callback(&entry->data, callerDataSize);
548
549 // Wrap around to the beginning, but do not allow the tail to catch
550 // up to the head.
551
552 entry->size = callerDataSize;
553
554 // We need to make sure that there is enough room to set the size before
555 // doing this. The user client checks for this and will look for the size
556 // at the beginning if there isn't room for it at the end.
557
558 if ((queueSize - tail) >= DATA_QUEUE_ENTRY_HEADER_SIZE) {
559 ((IODataQueueEntry *)((uintptr_t)dataQueue->queue + tail))->size = dataSize;
560 }
561
562 newTail = entrySize;
563 } else {
564 retVal = kIOReturnOverrun; // queue is full
565 }
566 } else {
567 // Do not allow the tail to catch up to the head when the queue is full.
568 // That's why the comparison uses a '>' rather than '>='.
569
570 if ((head - tail) > entrySize) {
571 entry = (IODataQueueEntry *)((uintptr_t)dataQueue->queue + tail);
572
573 callback(&entry->data, callerDataSize);
574
575 entry->size = callerDataSize;
576
577 newTail = tail + entrySize;
578 } else {
579 retVal = kIOReturnOverrun; // queue is full
580 }
581 }
582
583 // Send notification (via mach message) that data is available.
584
585 if (retVal == kIOReturnSuccess) {
586 // Publish the data we just enqueued
587 __c11_atomic_store((_Atomic uint32_t *)&dataQueue->tail, newTail, __ATOMIC_RELEASE);
588
589 if (tail != head) {
590 //
591 // The memory barrier below pairs with the one in dequeue
592 // so that either our store to the tail cannot be missed by
593 // the next dequeue attempt, or we will observe the dequeuer
594 // making the queue empty.
595 //
596 // Of course, if we already think the queue is empty,
597 // there's no point paying this extra cost.
598 //
599 __c11_atomic_thread_fence(__ATOMIC_SEQ_CST);
600 head = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->head, __ATOMIC_RELAXED);
601 }
602
603 if (tail == head) {
604 // Send notification that data is now available.
605 *sendDataAvailable = true;
606 retVal = kIOReturnSuccess;
607 }
608 } else if (retVal == kIOReturnOverrun) {
609 // ask to be notified of Dequeue()
610 dataQueue->needServicedCallback = true;
611 *sendDataAvailable = true;
612 }
613
614 return retVal;
615 }
616
617 kern_return_t
CanEnqueueData(uint32_t callerDataSize)618 IODataQueueDispatchSource::CanEnqueueData(uint32_t callerDataSize)
619 {
620 return CanEnqueueData(callerDataSize, 1);
621 }
622
623 kern_return_t
CanEnqueueData(uint32_t callerDataSize,uint32_t dataCount)624 IODataQueueDispatchSource::CanEnqueueData(uint32_t callerDataSize, uint32_t dataCount)
625 {
626 IODataQueueMemory * dataQueue;
627 uint32_t head;
628 uint32_t tail;
629 uint32_t dataSize;
630 uint32_t queueSize;
631 uint32_t entrySize;
632
633 dataQueue = ivars->dataQueue;
634 if (!dataQueue) {
635 return kIOReturnNoMemory;
636 }
637 queueSize = ivars->queueByteCount;
638
639 // Force a single read of head and tail
640 tail = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->tail, __ATOMIC_RELAXED);
641 head = __c11_atomic_load((_Atomic uint32_t *)&dataQueue->head, __ATOMIC_ACQUIRE);
642
643 if (os_add_overflow(callerDataSize, 3, &dataSize)) {
644 return kIOReturnOverrun;
645 }
646 dataSize &= ~3U;
647
648 // Check for overflow of entrySize
649 if (os_add_overflow(DATA_QUEUE_ENTRY_HEADER_SIZE, dataSize, &entrySize)) {
650 return kIOReturnOverrun;
651 }
652
653 // Check for underflow of (getQueueSize() - tail)
654 if (queueSize < tail || queueSize < head) {
655 return kIOReturnError;
656 }
657
658 if (tail >= head) {
659 uint32_t endSpace = queueSize - tail;
660 uint32_t endElements = endSpace / entrySize;
661 uint32_t beginElements = head / entrySize;
662 if (endElements < dataCount && endElements + beginElements <= dataCount) {
663 return kIOReturnOverrun;
664 }
665 } else {
666 // Do not allow the tail to catch up to the head when the queue is full.
667 uint32_t space = head - tail - 1;
668 uint32_t elements = space / entrySize;
669 if (elements < dataCount) {
670 return kIOReturnOverrun;
671 }
672 }
673
674 return kIOReturnSuccess;
675 }
676
677 size_t
GetDataQueueEntryHeaderSize()678 IODataQueueDispatchSource::GetDataQueueEntryHeaderSize()
679 {
680 return DATA_QUEUE_ENTRY_HEADER_SIZE;
681 }
682