1 /*
2 * Copyright 2009-2015 Samy Al Bahra.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27 #ifndef CK_RING_H
28 #define CK_RING_H
29
30 #include <ck_cc.h>
31 #include <ck_md.h>
32 #include <ck_pr.h>
33 #include <ck_stdbool.h>
34 #include <ck_string.h>
35
36 /*
37 * Concurrent ring buffer.
38 */
39
40 struct ck_ring {
41 unsigned int c_head;
42 char pad[CK_MD_CACHELINE - sizeof(unsigned int)];
43 unsigned int p_tail;
44 unsigned int p_head;
45 char _pad[CK_MD_CACHELINE - sizeof(unsigned int) * 2];
46 unsigned int size;
47 unsigned int mask;
48 };
49 typedef struct ck_ring ck_ring_t;
50
51 struct ck_ring_buffer {
52 void *value;
53 };
54 typedef struct ck_ring_buffer ck_ring_buffer_t;
55
56 CK_CC_INLINE static unsigned int
ck_ring_size(const struct ck_ring * ring)57 ck_ring_size(const struct ck_ring *ring)
58 {
59 unsigned int c, p;
60
61 c = ck_pr_load_uint(&ring->c_head);
62 p = ck_pr_load_uint(&ring->p_tail);
63 return (p - c) & ring->mask;
64 }
65
66 CK_CC_INLINE static unsigned int
ck_ring_capacity(const struct ck_ring * ring)67 ck_ring_capacity(const struct ck_ring *ring)
68 {
69 return ring->size;
70 }
71
72 CK_CC_INLINE static void
ck_ring_init(struct ck_ring * ring,unsigned int size)73 ck_ring_init(struct ck_ring *ring, unsigned int size)
74 {
75
76 ring->size = size;
77 ring->mask = size - 1;
78 ring->p_tail = 0;
79 ring->p_head = 0;
80 ring->c_head = 0;
81 return;
82 }
83
84 /*
85 * The _ck_ring_* namespace is internal only and must not used externally.
86 */
87 CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_sp(struct ck_ring * ring,void * CK_CC_RESTRICT buffer,const void * CK_CC_RESTRICT entry,unsigned int ts,unsigned int * size)88 _ck_ring_enqueue_sp(struct ck_ring *ring,
89 void *CK_CC_RESTRICT buffer,
90 const void *CK_CC_RESTRICT entry,
91 unsigned int ts,
92 unsigned int *size)
93 {
94 const unsigned int mask = ring->mask;
95 unsigned int consumer, producer, delta;
96
97 consumer = ck_pr_load_uint(&ring->c_head);
98 producer = ring->p_tail;
99 delta = producer + 1;
100 if (size != NULL)
101 *size = (producer - consumer) & mask;
102
103 if (CK_CC_UNLIKELY((delta & mask) == (consumer & mask)))
104 return false;
105
106 buffer = (char *)buffer + ts * (producer & mask);
107 memcpy(buffer, entry, ts);
108
109 /*
110 * Make sure to update slot value before indicating
111 * that the slot is available for consumption.
112 */
113 ck_pr_fence_store();
114 ck_pr_store_uint(&ring->p_tail, delta);
115 return true;
116 }
117
118 CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_sp_size(struct ck_ring * ring,void * CK_CC_RESTRICT buffer,const void * CK_CC_RESTRICT entry,unsigned int ts,unsigned int * size)119 _ck_ring_enqueue_sp_size(struct ck_ring *ring,
120 void *CK_CC_RESTRICT buffer,
121 const void *CK_CC_RESTRICT entry,
122 unsigned int ts,
123 unsigned int *size)
124 {
125 unsigned int sz;
126 bool r;
127
128 r = _ck_ring_enqueue_sp(ring, buffer, entry, ts, &sz);
129 *size = sz;
130 return r;
131 }
132
133 CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_sc(struct ck_ring * ring,const void * CK_CC_RESTRICT buffer,void * CK_CC_RESTRICT target,unsigned int size)134 _ck_ring_dequeue_sc(struct ck_ring *ring,
135 const void *CK_CC_RESTRICT buffer,
136 void *CK_CC_RESTRICT target,
137 unsigned int size)
138 {
139 const unsigned int mask = ring->mask;
140 unsigned int consumer, producer;
141
142 consumer = ring->c_head;
143 producer = ck_pr_load_uint(&ring->p_tail);
144
145 if (CK_CC_UNLIKELY(consumer == producer))
146 return false;
147
148 /*
149 * Make sure to serialize with respect to our snapshot
150 * of the producer counter.
151 */
152 ck_pr_fence_load();
153
154 buffer = (const char *)buffer + size * (consumer & mask);
155 memcpy(target, buffer, size);
156
157 /*
158 * Make sure copy is completed with respect to consumer
159 * update.
160 */
161 ck_pr_fence_store();
162 ck_pr_store_uint(&ring->c_head, consumer + 1);
163 return true;
164 }
165
166 CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_mp(struct ck_ring * ring,void * buffer,const void * entry,unsigned int ts,unsigned int * size)167 _ck_ring_enqueue_mp(struct ck_ring *ring,
168 void *buffer,
169 const void *entry,
170 unsigned int ts,
171 unsigned int *size)
172 {
173 const unsigned int mask = ring->mask;
174 unsigned int producer, consumer, delta;
175 bool r = true;
176
177 producer = ck_pr_load_uint(&ring->p_head);
178
179 for (;;) {
180 /*
181 * The snapshot of producer must be up to date with respect to
182 * consumer.
183 */
184 ck_pr_fence_load();
185 consumer = ck_pr_load_uint(&ring->c_head);
186
187 delta = producer + 1;
188
189 /*
190 * Only try to CAS if the producer is not clearly stale (not
191 * less than consumer) and the buffer is definitely not full.
192 */
193 if (CK_CC_LIKELY((producer - consumer) < mask)) {
194 if (ck_pr_cas_uint_value(&ring->p_head,
195 producer, delta, &producer) == true) {
196 break;
197 }
198 } else {
199 unsigned int new_producer;
200
201 /*
202 * Slow path. Either the buffer is full or we have a
203 * stale snapshot of p_head. Execute a second read of
204 * p_read that must be ordered wrt the snapshot of
205 * c_head.
206 */
207 ck_pr_fence_load();
208 new_producer = ck_pr_load_uint(&ring->p_head);
209
210 /*
211 * Only fail if we haven't made forward progress in
212 * production: the buffer must have been full when we
213 * read new_producer (or we wrapped around UINT_MAX
214 * during this iteration).
215 */
216 if (producer == new_producer) {
217 r = false;
218 goto leave;
219 }
220
221 /*
222 * p_head advanced during this iteration. Try again.
223 */
224 producer = new_producer;
225 }
226 }
227
228 buffer = (char *)buffer + ts * (producer & mask);
229 memcpy(buffer, entry, ts);
230
231 /*
232 * Wait until all concurrent producers have completed writing
233 * their data into the ring buffer.
234 */
235 while (ck_pr_load_uint(&ring->p_tail) != producer)
236 ck_pr_stall();
237
238 /*
239 * Ensure that copy is completed before updating shared producer
240 * counter.
241 */
242 ck_pr_fence_store();
243 ck_pr_store_uint(&ring->p_tail, delta);
244
245 leave:
246 if (size != NULL)
247 *size = (producer - consumer) & mask;
248
249 return r;
250 }
251
252 CK_CC_FORCE_INLINE static bool
_ck_ring_enqueue_mp_size(struct ck_ring * ring,void * buffer,const void * entry,unsigned int ts,unsigned int * size)253 _ck_ring_enqueue_mp_size(struct ck_ring *ring,
254 void *buffer,
255 const void *entry,
256 unsigned int ts,
257 unsigned int *size)
258 {
259 unsigned int sz;
260 bool r;
261
262 r = _ck_ring_enqueue_mp(ring, buffer, entry, ts, &sz);
263 *size = sz;
264 return r;
265 }
266
267 CK_CC_FORCE_INLINE static bool
_ck_ring_trydequeue_mc(struct ck_ring * ring,const void * buffer,void * data,unsigned int size)268 _ck_ring_trydequeue_mc(struct ck_ring *ring,
269 const void *buffer,
270 void *data,
271 unsigned int size)
272 {
273 const unsigned int mask = ring->mask;
274 unsigned int consumer, producer;
275
276 consumer = ck_pr_load_uint(&ring->c_head);
277 ck_pr_fence_load();
278 producer = ck_pr_load_uint(&ring->p_tail);
279
280 if (CK_CC_UNLIKELY(consumer == producer))
281 return false;
282
283 ck_pr_fence_load();
284
285 buffer = (const char *)buffer + size * (consumer & mask);
286 memcpy(data, buffer, size);
287
288 ck_pr_fence_store_atomic();
289 return ck_pr_cas_uint(&ring->c_head, consumer, consumer + 1);
290 }
291
292 CK_CC_FORCE_INLINE static bool
_ck_ring_dequeue_mc(struct ck_ring * ring,const void * buffer,void * data,unsigned int ts)293 _ck_ring_dequeue_mc(struct ck_ring *ring,
294 const void *buffer,
295 void *data,
296 unsigned int ts)
297 {
298 const unsigned int mask = ring->mask;
299 unsigned int consumer, producer;
300
301 consumer = ck_pr_load_uint(&ring->c_head);
302
303 do {
304 const char *target;
305
306 /*
307 * Producer counter must represent state relative to
308 * our latest consumer snapshot.
309 */
310 ck_pr_fence_load();
311 producer = ck_pr_load_uint(&ring->p_tail);
312
313 if (CK_CC_UNLIKELY(consumer == producer))
314 return false;
315
316 ck_pr_fence_load();
317
318 target = (const char *)buffer + ts * (consumer & mask);
319 memcpy(data, target, ts);
320
321 /* Serialize load with respect to head update. */
322 ck_pr_fence_store_atomic();
323 } while (ck_pr_cas_uint_value(&ring->c_head,
324 consumer,
325 consumer + 1,
326 &consumer) == false);
327
328 return true;
329 }
330
331 /*
332 * The ck_ring_*_spsc namespace is the public interface for interacting with a
333 * ring buffer containing pointers. Correctness is only provided if there is up
334 * to one concurrent consumer and up to one concurrent producer.
335 */
336 CK_CC_INLINE static bool
ck_ring_enqueue_spsc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry,unsigned int * size)337 ck_ring_enqueue_spsc_size(struct ck_ring *ring,
338 struct ck_ring_buffer *buffer,
339 const void *entry,
340 unsigned int *size)
341 {
342
343 return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
344 sizeof(entry), size);
345 }
346
347 CK_CC_INLINE static bool
ck_ring_enqueue_spsc(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry)348 ck_ring_enqueue_spsc(struct ck_ring *ring,
349 struct ck_ring_buffer *buffer,
350 const void *entry)
351 {
352
353 return _ck_ring_enqueue_sp(ring, buffer,
354 &entry, sizeof(entry), NULL);
355 }
356
357 CK_CC_INLINE static bool
ck_ring_dequeue_spsc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)358 ck_ring_dequeue_spsc(struct ck_ring *ring,
359 const struct ck_ring_buffer *buffer,
360 void *data)
361 {
362
363 return _ck_ring_dequeue_sc(ring, buffer,
364 (void **)data, sizeof(void *));
365 }
366
367 /*
368 * The ck_ring_*_mpmc namespace is the public interface for interacting with a
369 * ring buffer containing pointers. Correctness is provided for any number of
370 * producers and consumers.
371 */
372 CK_CC_INLINE static bool
ck_ring_enqueue_mpmc(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry)373 ck_ring_enqueue_mpmc(struct ck_ring *ring,
374 struct ck_ring_buffer *buffer,
375 const void *entry)
376 {
377
378 return _ck_ring_enqueue_mp(ring, buffer, &entry,
379 sizeof(entry), NULL);
380 }
381
382 CK_CC_INLINE static bool
ck_ring_enqueue_mpmc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry,unsigned int * size)383 ck_ring_enqueue_mpmc_size(struct ck_ring *ring,
384 struct ck_ring_buffer *buffer,
385 const void *entry,
386 unsigned int *size)
387 {
388
389 return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
390 sizeof(entry), size);
391 }
392
393 CK_CC_INLINE static bool
ck_ring_trydequeue_mpmc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)394 ck_ring_trydequeue_mpmc(struct ck_ring *ring,
395 const struct ck_ring_buffer *buffer,
396 void *data)
397 {
398
399 return _ck_ring_trydequeue_mc(ring,
400 buffer, (void **)data, sizeof(void *));
401 }
402
403 CK_CC_INLINE static bool
ck_ring_dequeue_mpmc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)404 ck_ring_dequeue_mpmc(struct ck_ring *ring,
405 const struct ck_ring_buffer *buffer,
406 void *data)
407 {
408
409 return _ck_ring_dequeue_mc(ring, buffer, (void **)data,
410 sizeof(void *));
411 }
412
413 /*
414 * The ck_ring_*_spmc namespace is the public interface for interacting with a
415 * ring buffer containing pointers. Correctness is provided for any number of
416 * consumers with up to one concurrent producer.
417 */
418 CK_CC_INLINE static bool
ck_ring_enqueue_spmc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry,unsigned int * size)419 ck_ring_enqueue_spmc_size(struct ck_ring *ring,
420 struct ck_ring_buffer *buffer,
421 const void *entry,
422 unsigned int *size)
423 {
424
425 return _ck_ring_enqueue_sp_size(ring, buffer, &entry,
426 sizeof(entry), size);
427 }
428
429 CK_CC_INLINE static bool
ck_ring_enqueue_spmc(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry)430 ck_ring_enqueue_spmc(struct ck_ring *ring,
431 struct ck_ring_buffer *buffer,
432 const void *entry)
433 {
434
435 return _ck_ring_enqueue_sp(ring, buffer, &entry,
436 sizeof(entry), NULL);
437 }
438
439 CK_CC_INLINE static bool
ck_ring_trydequeue_spmc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)440 ck_ring_trydequeue_spmc(struct ck_ring *ring,
441 const struct ck_ring_buffer *buffer,
442 void *data)
443 {
444
445 return _ck_ring_trydequeue_mc(ring, buffer, (void **)data, sizeof(void *));
446 }
447
448 CK_CC_INLINE static bool
ck_ring_dequeue_spmc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)449 ck_ring_dequeue_spmc(struct ck_ring *ring,
450 const struct ck_ring_buffer *buffer,
451 void *data)
452 {
453
454 return _ck_ring_dequeue_mc(ring, buffer, (void **)data, sizeof(void *));
455 }
456
457 /*
458 * The ck_ring_*_mpsc namespace is the public interface for interacting with a
459 * ring buffer containing pointers. Correctness is provided for any number of
460 * producers with up to one concurrent consumers.
461 */
462 CK_CC_INLINE static bool
ck_ring_enqueue_mpsc(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry)463 ck_ring_enqueue_mpsc(struct ck_ring *ring,
464 struct ck_ring_buffer *buffer,
465 const void *entry)
466 {
467
468 return _ck_ring_enqueue_mp(ring, buffer, &entry,
469 sizeof(entry), NULL);
470 }
471
472 CK_CC_INLINE static bool
ck_ring_enqueue_mpsc_size(struct ck_ring * ring,struct ck_ring_buffer * buffer,const void * entry,unsigned int * size)473 ck_ring_enqueue_mpsc_size(struct ck_ring *ring,
474 struct ck_ring_buffer *buffer,
475 const void *entry,
476 unsigned int *size)
477 {
478
479 return _ck_ring_enqueue_mp_size(ring, buffer, &entry,
480 sizeof(entry), size);
481 }
482
483 CK_CC_INLINE static bool
ck_ring_dequeue_mpsc(struct ck_ring * ring,const struct ck_ring_buffer * buffer,void * data)484 ck_ring_dequeue_mpsc(struct ck_ring *ring,
485 const struct ck_ring_buffer *buffer,
486 void *data)
487 {
488
489 return _ck_ring_dequeue_sc(ring, buffer, (void **)data,
490 sizeof(void *));
491 }
492
493 /*
494 * CK_RING_PROTOTYPE is used to define a type-safe interface for inlining
495 * values of a particular type in the ring the buffer.
496 */
497 #define CK_RING_PROTOTYPE(name, type) \
498 CK_CC_INLINE static bool \
499 ck_ring_enqueue_spsc_size_##name(struct ck_ring *a, \
500 struct type *b, \
501 struct type *c, \
502 unsigned int *d) \
503 { \
504 \
505 return _ck_ring_enqueue_sp_size(a, b, c, \
506 sizeof(struct type), d); \
507 } \
508 \
509 CK_CC_INLINE static bool \
510 ck_ring_enqueue_spsc_##name(struct ck_ring *a, \
511 struct type *b, \
512 struct type *c) \
513 { \
514 \
515 return _ck_ring_enqueue_sp(a, b, c, \
516 sizeof(struct type), NULL); \
517 } \
518 \
519 CK_CC_INLINE static bool \
520 ck_ring_dequeue_spsc_##name(struct ck_ring *a, \
521 struct type *b, \
522 struct type *c) \
523 { \
524 \
525 return _ck_ring_dequeue_sc(a, b, c, \
526 sizeof(struct type)); \
527 } \
528 \
529 CK_CC_INLINE static bool \
530 ck_ring_enqueue_spmc_size_##name(struct ck_ring *a, \
531 struct type *b, \
532 struct type *c, \
533 unsigned int *d) \
534 { \
535 \
536 return _ck_ring_enqueue_sp_size(a, b, c, \
537 sizeof(struct type), d); \
538 } \
539 \
540 CK_CC_INLINE static bool \
541 ck_ring_enqueue_spmc_##name(struct ck_ring *a, \
542 struct type *b, \
543 struct type *c) \
544 { \
545 \
546 return _ck_ring_enqueue_sp(a, b, c, \
547 sizeof(struct type), NULL); \
548 } \
549 \
550 CK_CC_INLINE static bool \
551 ck_ring_trydequeue_spmc_##name(struct ck_ring *a, \
552 struct type *b, \
553 struct type *c) \
554 { \
555 \
556 return _ck_ring_trydequeue_mc(a, \
557 b, c, sizeof(struct type)); \
558 } \
559 \
560 CK_CC_INLINE static bool \
561 ck_ring_dequeue_spmc_##name(struct ck_ring *a, \
562 struct type *b, \
563 struct type *c) \
564 { \
565 \
566 return _ck_ring_dequeue_mc(a, b, c, \
567 sizeof(struct type)); \
568 } \
569 \
570 CK_CC_INLINE static bool \
571 ck_ring_enqueue_mpsc_##name(struct ck_ring *a, \
572 struct type *b, \
573 struct type *c) \
574 { \
575 \
576 return _ck_ring_enqueue_mp(a, b, c, \
577 sizeof(struct type), NULL); \
578 } \
579 \
580 CK_CC_INLINE static bool \
581 ck_ring_enqueue_mpsc_size_##name(struct ck_ring *a, \
582 struct type *b, \
583 struct type *c, \
584 unsigned int *d) \
585 { \
586 \
587 return _ck_ring_enqueue_mp_size(a, b, c, \
588 sizeof(struct type), d); \
589 } \
590 \
591 CK_CC_INLINE static bool \
592 ck_ring_dequeue_mpsc_##name(struct ck_ring *a, \
593 struct type *b, \
594 struct type *c) \
595 { \
596 \
597 return _ck_ring_dequeue_sc(a, b, c, \
598 sizeof(struct type)); \
599 } \
600 \
601 CK_CC_INLINE static bool \
602 ck_ring_enqueue_mpmc_size_##name(struct ck_ring *a, \
603 struct type *b, \
604 struct type *c, \
605 unsigned int *d) \
606 { \
607 \
608 return _ck_ring_enqueue_mp_size(a, b, c, \
609 sizeof(struct type), d); \
610 } \
611 \
612 CK_CC_INLINE static bool \
613 ck_ring_enqueue_mpmc_##name(struct ck_ring *a, \
614 struct type *b, \
615 struct type *c) \
616 { \
617 \
618 return _ck_ring_enqueue_mp(a, b, c, \
619 sizeof(struct type), NULL); \
620 } \
621 \
622 CK_CC_INLINE static bool \
623 ck_ring_trydequeue_mpmc_##name(struct ck_ring *a, \
624 struct type *b, \
625 struct type *c) \
626 { \
627 \
628 return _ck_ring_trydequeue_mc(a, \
629 b, c, sizeof(struct type)); \
630 } \
631 \
632 CK_CC_INLINE static bool \
633 ck_ring_dequeue_mpmc_##name(struct ck_ring *a, \
634 struct type *b, \
635 struct type *c) \
636 { \
637 \
638 return _ck_ring_dequeue_mc(a, b, c, \
639 sizeof(struct type)); \
640 }
641
642 /*
643 * A single producer with one concurrent consumer.
644 */
645 #define CK_RING_ENQUEUE_SPSC(name, a, b, c) \
646 ck_ring_enqueue_spsc_##name(a, b, c)
647 #define CK_RING_ENQUEUE_SPSC_SIZE(name, a, b, c, d) \
648 ck_ring_enqueue_spsc_size_##name(a, b, c, d)
649 #define CK_RING_DEQUEUE_SPSC(name, a, b, c) \
650 ck_ring_dequeue_spsc_##name(a, b, c)
651
652 /*
653 * A single producer with any number of concurrent consumers.
654 */
655 #define CK_RING_ENQUEUE_SPMC(name, a, b, c) \
656 ck_ring_enqueue_spmc_##name(a, b, c)
657 #define CK_RING_ENQUEUE_SPMC_SIZE(name, a, b, c, d) \
658 ck_ring_enqueue_spmc_size_##name(a, b, c, d)
659 #define CK_RING_TRYDEQUEUE_SPMC(name, a, b, c) \
660 ck_ring_trydequeue_spmc_##name(a, b, c)
661 #define CK_RING_DEQUEUE_SPMC(name, a, b, c) \
662 ck_ring_dequeue_spmc_##name(a, b, c)
663
664 /*
665 * Any number of concurrent producers with up to one
666 * concurrent consumer.
667 */
668 #define CK_RING_ENQUEUE_MPSC(name, a, b, c) \
669 ck_ring_enqueue_mpsc_##name(a, b, c)
670 #define CK_RING_ENQUEUE_MPSC_SIZE(name, a, b, c, d) \
671 ck_ring_enqueue_mpsc_size_##name(a, b, c, d)
672 #define CK_RING_DEQUEUE_MPSC(name, a, b, c) \
673 ck_ring_dequeue_mpsc_##name(a, b, c)
674
675 /*
676 * Any number of concurrent producers and consumers.
677 */
678 #define CK_RING_ENQUEUE_MPMC(name, a, b, c) \
679 ck_ring_enqueue_mpmc_##name(a, b, c)
680 #define CK_RING_ENQUEUE_MPMC_SIZE(name, a, b, c, d) \
681 ck_ring_enqueue_mpmc_size_##name(a, b, c, d)
682 #define CK_RING_TRYDEQUEUE_MPMC(name, a, b, c) \
683 ck_ring_trydequeue_mpmc_##name(a, b, c)
684 #define CK_RING_DEQUEUE_MPMC(name, a, b, c) \
685 ck_ring_dequeue_mpmc_##name(a, b, c)
686
687 #endif /* CK_RING_H */
688