1 /* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <[email protected]> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21 #ifndef _LINUX_PTR_RING_H 22 #define _LINUX_PTR_RING_H 1 23 24 #ifdef __KERNEL__ 25 #include <linux/spinlock.h> 26 #include <linux/cache.h> 27 #include <linux/types.h> 28 #include <linux/compiler.h> 29 #include <linux/cache.h> 30 #include <linux/slab.h> 31 #include <asm/errno.h> 32 #endif 33 34 struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45 }; 46 47 /* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). If ring is ever resized, callers must hold 49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 50 * producer_lock, the next call to __ptr_ring_produce may fail. 51 */ 52 static inline bool __ptr_ring_full(struct ptr_ring *r) 53 { 54 return r->queue[r->producer]; 55 } 56 57 static inline bool ptr_ring_full(struct ptr_ring *r) 58 { 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66 } 67 68 static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69 { 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77 } 78 79 static inline bool ptr_ring_full_any(struct ptr_ring *r) 80 { 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89 } 90 91 static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92 { 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100 } 101 102 /* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 * Callers are responsible for making sure pointer that is being queued 105 * points to a valid data. 106 */ 107 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 108 { 109 if (unlikely(!r->size) || r->queue[r->producer]) 110 return -ENOSPC; 111 112 /* Make sure the pointer we are storing points to a valid data. */ 113 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 114 smp_wmb(); 115 116 r->queue[r->producer++] = ptr; 117 if (unlikely(r->producer >= r->size)) 118 r->producer = 0; 119 return 0; 120 } 121 122 /* 123 * Note: resize (below) nests producer lock within consumer lock, so if you 124 * consume in interrupt or BH context, you must disable interrupts/BH when 125 * calling this. 126 */ 127 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 128 { 129 int ret; 130 131 spin_lock(&r->producer_lock); 132 ret = __ptr_ring_produce(r, ptr); 133 spin_unlock(&r->producer_lock); 134 135 return ret; 136 } 137 138 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 139 { 140 int ret; 141 142 spin_lock_irq(&r->producer_lock); 143 ret = __ptr_ring_produce(r, ptr); 144 spin_unlock_irq(&r->producer_lock); 145 146 return ret; 147 } 148 149 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 150 { 151 unsigned long flags; 152 int ret; 153 154 spin_lock_irqsave(&r->producer_lock, flags); 155 ret = __ptr_ring_produce(r, ptr); 156 spin_unlock_irqrestore(&r->producer_lock, flags); 157 158 return ret; 159 } 160 161 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 162 { 163 int ret; 164 165 spin_lock_bh(&r->producer_lock); 166 ret = __ptr_ring_produce(r, ptr); 167 spin_unlock_bh(&r->producer_lock); 168 169 return ret; 170 } 171 172 /* Note: callers invoking this in a loop must use a compiler barrier, 173 * for example cpu_relax(). Callers must take consumer_lock 174 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. 175 * If ring is never resized, and if the pointer is merely 176 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. 177 * However, if called outside the lock, and if some other CPU 178 * consumes ring entries at the same time, the value returned 179 * is not guaranteed to be correct. 180 * In this case - to avoid incorrectly detecting the ring 181 * as empty - the CPU consuming the ring entries is responsible 182 * for either consuming all ring entries until the ring is empty, 183 * or synchronizing with some other CPU and causing it to 184 * execute __ptr_ring_peek and/or consume the ring enteries 185 * after the synchronization point. 186 */ 187 static inline void *__ptr_ring_peek(struct ptr_ring *r) 188 { 189 if (likely(r->size)) 190 return r->queue[r->consumer_head]; 191 return NULL; 192 } 193 194 /* See __ptr_ring_peek above for locking rules. */ 195 static inline bool __ptr_ring_empty(struct ptr_ring *r) 196 { 197 return !__ptr_ring_peek(r); 198 } 199 200 static inline bool ptr_ring_empty(struct ptr_ring *r) 201 { 202 bool ret; 203 204 spin_lock(&r->consumer_lock); 205 ret = __ptr_ring_empty(r); 206 spin_unlock(&r->consumer_lock); 207 208 return ret; 209 } 210 211 static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 212 { 213 bool ret; 214 215 spin_lock_irq(&r->consumer_lock); 216 ret = __ptr_ring_empty(r); 217 spin_unlock_irq(&r->consumer_lock); 218 219 return ret; 220 } 221 222 static inline bool ptr_ring_empty_any(struct ptr_ring *r) 223 { 224 unsigned long flags; 225 bool ret; 226 227 spin_lock_irqsave(&r->consumer_lock, flags); 228 ret = __ptr_ring_empty(r); 229 spin_unlock_irqrestore(&r->consumer_lock, flags); 230 231 return ret; 232 } 233 234 static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 235 { 236 bool ret; 237 238 spin_lock_bh(&r->consumer_lock); 239 ret = __ptr_ring_empty(r); 240 spin_unlock_bh(&r->consumer_lock); 241 242 return ret; 243 } 244 245 /* Must only be called after __ptr_ring_peek returned !NULL */ 246 static inline void __ptr_ring_discard_one(struct ptr_ring *r) 247 { 248 /* Fundamentally, what we want to do is update consumer 249 * index and zero out the entry so producer can reuse it. 250 * Doing it naively at each consume would be as simple as: 251 * consumer = r->consumer; 252 * r->queue[consumer++] = NULL; 253 * if (unlikely(consumer >= r->size)) 254 * consumer = 0; 255 * r->consumer = consumer; 256 * but that is suboptimal when the ring is full as producer is writing 257 * out new entries in the same cache line. Defer these updates until a 258 * batch of entries has been consumed. 259 */ 260 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 261 * to work correctly. 262 */ 263 int consumer_head = r->consumer_head; 264 int head = consumer_head++; 265 266 /* Once we have processed enough entries invalidate them in 267 * the ring all at once so producer can reuse their space in the ring. 268 * We also do this when we reach end of the ring - not mandatory 269 * but helps keep the implementation simple. 270 */ 271 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 272 consumer_head >= r->size)) { 273 /* Zero out entries in the reverse order: this way we touch the 274 * cache line that producer might currently be reading the last; 275 * producer won't make progress and touch other cache lines 276 * besides the first one until we write out all entries. 277 */ 278 while (likely(head >= r->consumer_tail)) 279 r->queue[head--] = NULL; 280 r->consumer_tail = consumer_head; 281 } 282 if (unlikely(consumer_head >= r->size)) { 283 consumer_head = 0; 284 r->consumer_tail = 0; 285 } 286 r->consumer_head = consumer_head; 287 } 288 289 static inline void *__ptr_ring_consume(struct ptr_ring *r) 290 { 291 void *ptr; 292 293 ptr = __ptr_ring_peek(r); 294 if (ptr) 295 __ptr_ring_discard_one(r); 296 297 /* Make sure anyone accessing data through the pointer is up to date. */ 298 /* Pairs with smp_wmb in __ptr_ring_produce. */ 299 smp_read_barrier_depends(); 300 return ptr; 301 } 302 303 static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 304 void **array, int n) 305 { 306 void *ptr; 307 int i; 308 309 for (i = 0; i < n; i++) { 310 ptr = __ptr_ring_consume(r); 311 if (!ptr) 312 break; 313 array[i] = ptr; 314 } 315 316 return i; 317 } 318 319 /* 320 * Note: resize (below) nests producer lock within consumer lock, so if you 321 * call this in interrupt or BH context, you must disable interrupts/BH when 322 * producing. 323 */ 324 static inline void *ptr_ring_consume(struct ptr_ring *r) 325 { 326 void *ptr; 327 328 spin_lock(&r->consumer_lock); 329 ptr = __ptr_ring_consume(r); 330 spin_unlock(&r->consumer_lock); 331 332 return ptr; 333 } 334 335 static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 336 { 337 void *ptr; 338 339 spin_lock_irq(&r->consumer_lock); 340 ptr = __ptr_ring_consume(r); 341 spin_unlock_irq(&r->consumer_lock); 342 343 return ptr; 344 } 345 346 static inline void *ptr_ring_consume_any(struct ptr_ring *r) 347 { 348 unsigned long flags; 349 void *ptr; 350 351 spin_lock_irqsave(&r->consumer_lock, flags); 352 ptr = __ptr_ring_consume(r); 353 spin_unlock_irqrestore(&r->consumer_lock, flags); 354 355 return ptr; 356 } 357 358 static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 359 { 360 void *ptr; 361 362 spin_lock_bh(&r->consumer_lock); 363 ptr = __ptr_ring_consume(r); 364 spin_unlock_bh(&r->consumer_lock); 365 366 return ptr; 367 } 368 369 static inline int ptr_ring_consume_batched(struct ptr_ring *r, 370 void **array, int n) 371 { 372 int ret; 373 374 spin_lock(&r->consumer_lock); 375 ret = __ptr_ring_consume_batched(r, array, n); 376 spin_unlock(&r->consumer_lock); 377 378 return ret; 379 } 380 381 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 382 void **array, int n) 383 { 384 int ret; 385 386 spin_lock_irq(&r->consumer_lock); 387 ret = __ptr_ring_consume_batched(r, array, n); 388 spin_unlock_irq(&r->consumer_lock); 389 390 return ret; 391 } 392 393 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 394 void **array, int n) 395 { 396 unsigned long flags; 397 int ret; 398 399 spin_lock_irqsave(&r->consumer_lock, flags); 400 ret = __ptr_ring_consume_batched(r, array, n); 401 spin_unlock_irqrestore(&r->consumer_lock, flags); 402 403 return ret; 404 } 405 406 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 407 void **array, int n) 408 { 409 int ret; 410 411 spin_lock_bh(&r->consumer_lock); 412 ret = __ptr_ring_consume_batched(r, array, n); 413 spin_unlock_bh(&r->consumer_lock); 414 415 return ret; 416 } 417 418 /* Cast to structure type and call a function without discarding from FIFO. 419 * Function must return a value. 420 * Callers must take consumer_lock. 421 */ 422 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 423 424 #define PTR_RING_PEEK_CALL(r, f) ({ \ 425 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 426 \ 427 spin_lock(&(r)->consumer_lock); \ 428 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 429 spin_unlock(&(r)->consumer_lock); \ 430 __PTR_RING_PEEK_CALL_v; \ 431 }) 432 433 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 434 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 435 \ 436 spin_lock_irq(&(r)->consumer_lock); \ 437 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 438 spin_unlock_irq(&(r)->consumer_lock); \ 439 __PTR_RING_PEEK_CALL_v; \ 440 }) 441 442 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 443 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 444 \ 445 spin_lock_bh(&(r)->consumer_lock); \ 446 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 447 spin_unlock_bh(&(r)->consumer_lock); \ 448 __PTR_RING_PEEK_CALL_v; \ 449 }) 450 451 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 452 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 453 unsigned long __PTR_RING_PEEK_CALL_f;\ 454 \ 455 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 456 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 457 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 458 __PTR_RING_PEEK_CALL_v; \ 459 }) 460 461 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 462 { 463 /* Allocate an extra dummy element at end of ring to avoid consumer head 464 * or produce head access past the end of the array. Possible when 465 * producer/consumer operations and __ptr_ring_peek operations run in 466 * parallel. 467 */ 468 return kcalloc(size + 1, sizeof(void *), gfp); 469 } 470 471 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 472 { 473 r->size = size; 474 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 475 /* We need to set batch at least to 1 to make logic 476 * in __ptr_ring_discard_one work correctly. 477 * Batching too much (because ring is small) would cause a lot of 478 * burstiness. Needs tuning, for now disable batching. 479 */ 480 if (r->batch > r->size / 2 || !r->batch) 481 r->batch = 1; 482 } 483 484 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 485 { 486 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 487 if (!r->queue) 488 return -ENOMEM; 489 490 __ptr_ring_set_size(r, size); 491 r->producer = r->consumer_head = r->consumer_tail = 0; 492 spin_lock_init(&r->producer_lock); 493 spin_lock_init(&r->consumer_lock); 494 495 return 0; 496 } 497 498 /* 499 * Return entries into ring. Destroy entries that don't fit. 500 * 501 * Note: this is expected to be a rare slow path operation. 502 * 503 * Note: producer lock is nested within consumer lock, so if you 504 * resize you must make sure all uses nest correctly. 505 * In particular if you consume ring in interrupt or BH context, you must 506 * disable interrupts/BH when doing so. 507 */ 508 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 509 void (*destroy)(void *)) 510 { 511 unsigned long flags; 512 int head; 513 514 spin_lock_irqsave(&r->consumer_lock, flags); 515 spin_lock(&r->producer_lock); 516 517 if (!r->size) 518 goto done; 519 520 /* 521 * Clean out buffered entries (for simplicity). This way following code 522 * can test entries for NULL and if not assume they are valid. 523 */ 524 head = r->consumer_head - 1; 525 while (likely(head >= r->consumer_tail)) 526 r->queue[head--] = NULL; 527 r->consumer_tail = r->consumer_head; 528 529 /* 530 * Go over entries in batch, start moving head back and copy entries. 531 * Stop when we run into previously unconsumed entries. 532 */ 533 while (n) { 534 head = r->consumer_head - 1; 535 if (head < 0) 536 head = r->size - 1; 537 if (r->queue[head]) { 538 /* This batch entry will have to be destroyed. */ 539 goto done; 540 } 541 r->queue[head] = batch[--n]; 542 r->consumer_tail = r->consumer_head = head; 543 } 544 545 done: 546 /* Destroy all entries left in the batch. */ 547 while (n) 548 destroy(batch[--n]); 549 spin_unlock(&r->producer_lock); 550 spin_unlock_irqrestore(&r->consumer_lock, flags); 551 } 552 553 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 554 int size, gfp_t gfp, 555 void (*destroy)(void *)) 556 { 557 int producer = 0; 558 void **old; 559 void *ptr; 560 561 while ((ptr = __ptr_ring_consume(r))) 562 if (producer < size) 563 queue[producer++] = ptr; 564 else if (destroy) 565 destroy(ptr); 566 567 __ptr_ring_set_size(r, size); 568 r->producer = producer; 569 r->consumer_head = 0; 570 r->consumer_tail = 0; 571 old = r->queue; 572 r->queue = queue; 573 574 return old; 575 } 576 577 /* 578 * Note: producer lock is nested within consumer lock, so if you 579 * resize you must make sure all uses nest correctly. 580 * In particular if you consume ring in interrupt or BH context, you must 581 * disable interrupts/BH when doing so. 582 */ 583 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 584 void (*destroy)(void *)) 585 { 586 unsigned long flags; 587 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 588 void **old; 589 590 if (!queue) 591 return -ENOMEM; 592 593 spin_lock_irqsave(&(r)->consumer_lock, flags); 594 spin_lock(&(r)->producer_lock); 595 596 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 597 598 spin_unlock(&(r)->producer_lock); 599 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 600 601 kfree(old); 602 603 return 0; 604 } 605 606 /* 607 * Note: producer lock is nested within consumer lock, so if you 608 * resize you must make sure all uses nest correctly. 609 * In particular if you consume ring in interrupt or BH context, you must 610 * disable interrupts/BH when doing so. 611 */ 612 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 613 unsigned int nrings, 614 int size, 615 gfp_t gfp, void (*destroy)(void *)) 616 { 617 unsigned long flags; 618 void ***queues; 619 int i; 620 621 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 622 if (!queues) 623 goto noqueues; 624 625 for (i = 0; i < nrings; ++i) { 626 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 627 if (!queues[i]) 628 goto nomem; 629 } 630 631 for (i = 0; i < nrings; ++i) { 632 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 633 spin_lock(&(rings[i])->producer_lock); 634 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 635 size, gfp, destroy); 636 spin_unlock(&(rings[i])->producer_lock); 637 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 638 } 639 640 for (i = 0; i < nrings; ++i) 641 kfree(queues[i]); 642 643 kfree(queues); 644 645 return 0; 646 647 nomem: 648 while (--i >= 0) 649 kfree(queues[i]); 650 651 kfree(queues); 652 653 noqueues: 654 return -ENOMEM; 655 } 656 657 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 658 { 659 void *ptr; 660 661 if (destroy) 662 while ((ptr = ptr_ring_consume(r))) 663 destroy(ptr); 664 kfree(r->queue); 665 } 666 667 #endif /* _LINUX_PTR_RING_H */ 668