1 /* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <[email protected]> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21 #ifndef _LINUX_PTR_RING_H 22 #define _LINUX_PTR_RING_H 1 23 24 #ifdef __KERNEL__ 25 #include <linux/spinlock.h> 26 #include <linux/cache.h> 27 #include <linux/types.h> 28 #include <linux/compiler.h> 29 #include <linux/cache.h> 30 #include <linux/slab.h> 31 #include <asm/errno.h> 32 #endif 33 34 struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45 }; 46 47 /* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). If ring is ever resized, callers must hold 49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 50 * producer_lock, the next call to __ptr_ring_produce may fail. 51 */ 52 static inline bool __ptr_ring_full(struct ptr_ring *r) 53 { 54 return r->queue[r->producer]; 55 } 56 57 static inline bool ptr_ring_full(struct ptr_ring *r) 58 { 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66 } 67 68 static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69 { 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77 } 78 79 static inline bool ptr_ring_full_any(struct ptr_ring *r) 80 { 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89 } 90 91 static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92 { 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100 } 101 102 /* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 * Callers are responsible for making sure pointer that is being queued 105 * points to a valid data. 106 */ 107 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 108 { 109 if (unlikely(!r->size) || r->queue[r->producer]) 110 return -ENOSPC; 111 112 /* Make sure the pointer we are storing points to a valid data. */ 113 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 114 smp_wmb(); 115 116 r->queue[r->producer++] = ptr; 117 if (unlikely(r->producer >= r->size)) 118 r->producer = 0; 119 return 0; 120 } 121 122 /* 123 * Note: resize (below) nests producer lock within consumer lock, so if you 124 * consume in interrupt or BH context, you must disable interrupts/BH when 125 * calling this. 126 */ 127 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 128 { 129 int ret; 130 131 spin_lock(&r->producer_lock); 132 ret = __ptr_ring_produce(r, ptr); 133 spin_unlock(&r->producer_lock); 134 135 return ret; 136 } 137 138 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 139 { 140 int ret; 141 142 spin_lock_irq(&r->producer_lock); 143 ret = __ptr_ring_produce(r, ptr); 144 spin_unlock_irq(&r->producer_lock); 145 146 return ret; 147 } 148 149 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 150 { 151 unsigned long flags; 152 int ret; 153 154 spin_lock_irqsave(&r->producer_lock, flags); 155 ret = __ptr_ring_produce(r, ptr); 156 spin_unlock_irqrestore(&r->producer_lock, flags); 157 158 return ret; 159 } 160 161 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 162 { 163 int ret; 164 165 spin_lock_bh(&r->producer_lock); 166 ret = __ptr_ring_produce(r, ptr); 167 spin_unlock_bh(&r->producer_lock); 168 169 return ret; 170 } 171 172 static inline void *__ptr_ring_peek(struct ptr_ring *r) 173 { 174 if (likely(r->size)) 175 return r->queue[r->consumer_head]; 176 return NULL; 177 } 178 179 /* 180 * Test ring empty status without taking any locks. 181 * 182 * NB: This is only safe to call if ring is never resized. 183 * 184 * However, if some other CPU consumes ring entries at the same time, the value 185 * returned is not guaranteed to be correct. 186 * 187 * In this case - to avoid incorrectly detecting the ring 188 * as empty - the CPU consuming the ring entries is responsible 189 * for either consuming all ring entries until the ring is empty, 190 * or synchronizing with some other CPU and causing it to 191 * re-test __ptr_ring_empty and/or consume the ring enteries 192 * after the synchronization point. 193 * 194 * Note: callers invoking this in a loop must use a compiler barrier, 195 * for example cpu_relax(). 196 */ 197 static inline bool __ptr_ring_empty(struct ptr_ring *r) 198 { 199 return !__ptr_ring_peek(r); 200 } 201 202 static inline bool ptr_ring_empty(struct ptr_ring *r) 203 { 204 bool ret; 205 206 spin_lock(&r->consumer_lock); 207 ret = __ptr_ring_empty(r); 208 spin_unlock(&r->consumer_lock); 209 210 return ret; 211 } 212 213 static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 214 { 215 bool ret; 216 217 spin_lock_irq(&r->consumer_lock); 218 ret = __ptr_ring_empty(r); 219 spin_unlock_irq(&r->consumer_lock); 220 221 return ret; 222 } 223 224 static inline bool ptr_ring_empty_any(struct ptr_ring *r) 225 { 226 unsigned long flags; 227 bool ret; 228 229 spin_lock_irqsave(&r->consumer_lock, flags); 230 ret = __ptr_ring_empty(r); 231 spin_unlock_irqrestore(&r->consumer_lock, flags); 232 233 return ret; 234 } 235 236 static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 237 { 238 bool ret; 239 240 spin_lock_bh(&r->consumer_lock); 241 ret = __ptr_ring_empty(r); 242 spin_unlock_bh(&r->consumer_lock); 243 244 return ret; 245 } 246 247 /* Must only be called after __ptr_ring_peek returned !NULL */ 248 static inline void __ptr_ring_discard_one(struct ptr_ring *r) 249 { 250 /* Fundamentally, what we want to do is update consumer 251 * index and zero out the entry so producer can reuse it. 252 * Doing it naively at each consume would be as simple as: 253 * consumer = r->consumer; 254 * r->queue[consumer++] = NULL; 255 * if (unlikely(consumer >= r->size)) 256 * consumer = 0; 257 * r->consumer = consumer; 258 * but that is suboptimal when the ring is full as producer is writing 259 * out new entries in the same cache line. Defer these updates until a 260 * batch of entries has been consumed. 261 */ 262 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 263 * to work correctly. 264 */ 265 int consumer_head = r->consumer_head; 266 int head = consumer_head++; 267 268 /* Once we have processed enough entries invalidate them in 269 * the ring all at once so producer can reuse their space in the ring. 270 * We also do this when we reach end of the ring - not mandatory 271 * but helps keep the implementation simple. 272 */ 273 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 274 consumer_head >= r->size)) { 275 /* Zero out entries in the reverse order: this way we touch the 276 * cache line that producer might currently be reading the last; 277 * producer won't make progress and touch other cache lines 278 * besides the first one until we write out all entries. 279 */ 280 while (likely(head >= r->consumer_tail)) 281 r->queue[head--] = NULL; 282 r->consumer_tail = consumer_head; 283 } 284 if (unlikely(consumer_head >= r->size)) { 285 consumer_head = 0; 286 r->consumer_tail = 0; 287 } 288 r->consumer_head = consumer_head; 289 } 290 291 static inline void *__ptr_ring_consume(struct ptr_ring *r) 292 { 293 void *ptr; 294 295 ptr = __ptr_ring_peek(r); 296 if (ptr) 297 __ptr_ring_discard_one(r); 298 299 /* Make sure anyone accessing data through the pointer is up to date. */ 300 /* Pairs with smp_wmb in __ptr_ring_produce. */ 301 smp_read_barrier_depends(); 302 return ptr; 303 } 304 305 static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 306 void **array, int n) 307 { 308 void *ptr; 309 int i; 310 311 for (i = 0; i < n; i++) { 312 ptr = __ptr_ring_consume(r); 313 if (!ptr) 314 break; 315 array[i] = ptr; 316 } 317 318 return i; 319 } 320 321 /* 322 * Note: resize (below) nests producer lock within consumer lock, so if you 323 * call this in interrupt or BH context, you must disable interrupts/BH when 324 * producing. 325 */ 326 static inline void *ptr_ring_consume(struct ptr_ring *r) 327 { 328 void *ptr; 329 330 spin_lock(&r->consumer_lock); 331 ptr = __ptr_ring_consume(r); 332 spin_unlock(&r->consumer_lock); 333 334 return ptr; 335 } 336 337 static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 338 { 339 void *ptr; 340 341 spin_lock_irq(&r->consumer_lock); 342 ptr = __ptr_ring_consume(r); 343 spin_unlock_irq(&r->consumer_lock); 344 345 return ptr; 346 } 347 348 static inline void *ptr_ring_consume_any(struct ptr_ring *r) 349 { 350 unsigned long flags; 351 void *ptr; 352 353 spin_lock_irqsave(&r->consumer_lock, flags); 354 ptr = __ptr_ring_consume(r); 355 spin_unlock_irqrestore(&r->consumer_lock, flags); 356 357 return ptr; 358 } 359 360 static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 361 { 362 void *ptr; 363 364 spin_lock_bh(&r->consumer_lock); 365 ptr = __ptr_ring_consume(r); 366 spin_unlock_bh(&r->consumer_lock); 367 368 return ptr; 369 } 370 371 static inline int ptr_ring_consume_batched(struct ptr_ring *r, 372 void **array, int n) 373 { 374 int ret; 375 376 spin_lock(&r->consumer_lock); 377 ret = __ptr_ring_consume_batched(r, array, n); 378 spin_unlock(&r->consumer_lock); 379 380 return ret; 381 } 382 383 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 384 void **array, int n) 385 { 386 int ret; 387 388 spin_lock_irq(&r->consumer_lock); 389 ret = __ptr_ring_consume_batched(r, array, n); 390 spin_unlock_irq(&r->consumer_lock); 391 392 return ret; 393 } 394 395 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 396 void **array, int n) 397 { 398 unsigned long flags; 399 int ret; 400 401 spin_lock_irqsave(&r->consumer_lock, flags); 402 ret = __ptr_ring_consume_batched(r, array, n); 403 spin_unlock_irqrestore(&r->consumer_lock, flags); 404 405 return ret; 406 } 407 408 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 409 void **array, int n) 410 { 411 int ret; 412 413 spin_lock_bh(&r->consumer_lock); 414 ret = __ptr_ring_consume_batched(r, array, n); 415 spin_unlock_bh(&r->consumer_lock); 416 417 return ret; 418 } 419 420 /* Cast to structure type and call a function without discarding from FIFO. 421 * Function must return a value. 422 * Callers must take consumer_lock. 423 */ 424 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 425 426 #define PTR_RING_PEEK_CALL(r, f) ({ \ 427 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 428 \ 429 spin_lock(&(r)->consumer_lock); \ 430 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 431 spin_unlock(&(r)->consumer_lock); \ 432 __PTR_RING_PEEK_CALL_v; \ 433 }) 434 435 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 436 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 437 \ 438 spin_lock_irq(&(r)->consumer_lock); \ 439 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 440 spin_unlock_irq(&(r)->consumer_lock); \ 441 __PTR_RING_PEEK_CALL_v; \ 442 }) 443 444 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 445 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 446 \ 447 spin_lock_bh(&(r)->consumer_lock); \ 448 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 449 spin_unlock_bh(&(r)->consumer_lock); \ 450 __PTR_RING_PEEK_CALL_v; \ 451 }) 452 453 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 454 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 455 unsigned long __PTR_RING_PEEK_CALL_f;\ 456 \ 457 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 458 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 459 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 460 __PTR_RING_PEEK_CALL_v; \ 461 }) 462 463 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 464 { 465 /* Allocate an extra dummy element at end of ring to avoid consumer head 466 * or produce head access past the end of the array. Possible when 467 * producer/consumer operations and __ptr_ring_peek operations run in 468 * parallel. 469 */ 470 return kcalloc(size + 1, sizeof(void *), gfp); 471 } 472 473 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 474 { 475 r->size = size; 476 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 477 /* We need to set batch at least to 1 to make logic 478 * in __ptr_ring_discard_one work correctly. 479 * Batching too much (because ring is small) would cause a lot of 480 * burstiness. Needs tuning, for now disable batching. 481 */ 482 if (r->batch > r->size / 2 || !r->batch) 483 r->batch = 1; 484 } 485 486 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 487 { 488 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 489 if (!r->queue) 490 return -ENOMEM; 491 492 __ptr_ring_set_size(r, size); 493 r->producer = r->consumer_head = r->consumer_tail = 0; 494 spin_lock_init(&r->producer_lock); 495 spin_lock_init(&r->consumer_lock); 496 497 return 0; 498 } 499 500 /* 501 * Return entries into ring. Destroy entries that don't fit. 502 * 503 * Note: this is expected to be a rare slow path operation. 504 * 505 * Note: producer lock is nested within consumer lock, so if you 506 * resize you must make sure all uses nest correctly. 507 * In particular if you consume ring in interrupt or BH context, you must 508 * disable interrupts/BH when doing so. 509 */ 510 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 511 void (*destroy)(void *)) 512 { 513 unsigned long flags; 514 int head; 515 516 spin_lock_irqsave(&r->consumer_lock, flags); 517 spin_lock(&r->producer_lock); 518 519 if (!r->size) 520 goto done; 521 522 /* 523 * Clean out buffered entries (for simplicity). This way following code 524 * can test entries for NULL and if not assume they are valid. 525 */ 526 head = r->consumer_head - 1; 527 while (likely(head >= r->consumer_tail)) 528 r->queue[head--] = NULL; 529 r->consumer_tail = r->consumer_head; 530 531 /* 532 * Go over entries in batch, start moving head back and copy entries. 533 * Stop when we run into previously unconsumed entries. 534 */ 535 while (n) { 536 head = r->consumer_head - 1; 537 if (head < 0) 538 head = r->size - 1; 539 if (r->queue[head]) { 540 /* This batch entry will have to be destroyed. */ 541 goto done; 542 } 543 r->queue[head] = batch[--n]; 544 r->consumer_tail = r->consumer_head = head; 545 } 546 547 done: 548 /* Destroy all entries left in the batch. */ 549 while (n) 550 destroy(batch[--n]); 551 spin_unlock(&r->producer_lock); 552 spin_unlock_irqrestore(&r->consumer_lock, flags); 553 } 554 555 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 556 int size, gfp_t gfp, 557 void (*destroy)(void *)) 558 { 559 int producer = 0; 560 void **old; 561 void *ptr; 562 563 while ((ptr = __ptr_ring_consume(r))) 564 if (producer < size) 565 queue[producer++] = ptr; 566 else if (destroy) 567 destroy(ptr); 568 569 __ptr_ring_set_size(r, size); 570 r->producer = producer; 571 r->consumer_head = 0; 572 r->consumer_tail = 0; 573 old = r->queue; 574 r->queue = queue; 575 576 return old; 577 } 578 579 /* 580 * Note: producer lock is nested within consumer lock, so if you 581 * resize you must make sure all uses nest correctly. 582 * In particular if you consume ring in interrupt or BH context, you must 583 * disable interrupts/BH when doing so. 584 */ 585 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 586 void (*destroy)(void *)) 587 { 588 unsigned long flags; 589 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 590 void **old; 591 592 if (!queue) 593 return -ENOMEM; 594 595 spin_lock_irqsave(&(r)->consumer_lock, flags); 596 spin_lock(&(r)->producer_lock); 597 598 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 599 600 spin_unlock(&(r)->producer_lock); 601 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 602 603 kfree(old); 604 605 return 0; 606 } 607 608 /* 609 * Note: producer lock is nested within consumer lock, so if you 610 * resize you must make sure all uses nest correctly. 611 * In particular if you consume ring in interrupt or BH context, you must 612 * disable interrupts/BH when doing so. 613 */ 614 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 615 unsigned int nrings, 616 int size, 617 gfp_t gfp, void (*destroy)(void *)) 618 { 619 unsigned long flags; 620 void ***queues; 621 int i; 622 623 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 624 if (!queues) 625 goto noqueues; 626 627 for (i = 0; i < nrings; ++i) { 628 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 629 if (!queues[i]) 630 goto nomem; 631 } 632 633 for (i = 0; i < nrings; ++i) { 634 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 635 spin_lock(&(rings[i])->producer_lock); 636 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 637 size, gfp, destroy); 638 spin_unlock(&(rings[i])->producer_lock); 639 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 640 } 641 642 for (i = 0; i < nrings; ++i) 643 kfree(queues[i]); 644 645 kfree(queues); 646 647 return 0; 648 649 nomem: 650 while (--i >= 0) 651 kfree(queues[i]); 652 653 kfree(queues); 654 655 noqueues: 656 return -ENOMEM; 657 } 658 659 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 660 { 661 void *ptr; 662 663 if (destroy) 664 while ((ptr = ptr_ring_consume(r))) 665 destroy(ptr); 666 kfree(r->queue); 667 } 668 669 #endif /* _LINUX_PTR_RING_H */ 670