1 /* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <[email protected]> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21 #ifndef _LINUX_PTR_RING_H 22 #define _LINUX_PTR_RING_H 1 23 24 #ifdef __KERNEL__ 25 #include <linux/spinlock.h> 26 #include <linux/cache.h> 27 #include <linux/types.h> 28 #include <linux/compiler.h> 29 #include <linux/cache.h> 30 #include <linux/slab.h> 31 #include <asm/errno.h> 32 #endif 33 34 struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45 }; 46 47 /* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). If ring is ever resized, callers must hold 49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 50 * producer_lock, the next call to __ptr_ring_produce may fail. 51 */ 52 static inline bool __ptr_ring_full(struct ptr_ring *r) 53 { 54 return r->queue[r->producer]; 55 } 56 57 static inline bool ptr_ring_full(struct ptr_ring *r) 58 { 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66 } 67 68 static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69 { 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77 } 78 79 static inline bool ptr_ring_full_any(struct ptr_ring *r) 80 { 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89 } 90 91 static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92 { 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100 } 101 102 /* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 * Callers are responsible for making sure pointer that is being queued 105 * points to a valid data. 106 */ 107 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 108 { 109 if (unlikely(!r->size) || r->queue[r->producer]) 110 return -ENOSPC; 111 112 /* Make sure the pointer we are storing points to a valid data. */ 113 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 114 smp_wmb(); 115 116 r->queue[r->producer++] = ptr; 117 if (unlikely(r->producer >= r->size)) 118 r->producer = 0; 119 return 0; 120 } 121 122 /* 123 * Note: resize (below) nests producer lock within consumer lock, so if you 124 * consume in interrupt or BH context, you must disable interrupts/BH when 125 * calling this. 126 */ 127 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 128 { 129 int ret; 130 131 spin_lock(&r->producer_lock); 132 ret = __ptr_ring_produce(r, ptr); 133 spin_unlock(&r->producer_lock); 134 135 return ret; 136 } 137 138 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 139 { 140 int ret; 141 142 spin_lock_irq(&r->producer_lock); 143 ret = __ptr_ring_produce(r, ptr); 144 spin_unlock_irq(&r->producer_lock); 145 146 return ret; 147 } 148 149 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 150 { 151 unsigned long flags; 152 int ret; 153 154 spin_lock_irqsave(&r->producer_lock, flags); 155 ret = __ptr_ring_produce(r, ptr); 156 spin_unlock_irqrestore(&r->producer_lock, flags); 157 158 return ret; 159 } 160 161 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 162 { 163 int ret; 164 165 spin_lock_bh(&r->producer_lock); 166 ret = __ptr_ring_produce(r, ptr); 167 spin_unlock_bh(&r->producer_lock); 168 169 return ret; 170 } 171 172 /* Note: callers invoking this in a loop must use a compiler barrier, 173 * for example cpu_relax(). Callers must take consumer_lock 174 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. 175 * If ring is never resized, and if the pointer is merely 176 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. 177 */ 178 static inline void *__ptr_ring_peek(struct ptr_ring *r) 179 { 180 if (likely(r->size)) 181 return r->queue[r->consumer_head]; 182 return NULL; 183 } 184 185 /* Note: callers invoking this in a loop must use a compiler barrier, 186 * for example cpu_relax(). Callers must take consumer_lock 187 * if the ring is ever resized - see e.g. ptr_ring_empty. 188 */ 189 static inline bool __ptr_ring_empty(struct ptr_ring *r) 190 { 191 return !__ptr_ring_peek(r); 192 } 193 194 static inline bool ptr_ring_empty(struct ptr_ring *r) 195 { 196 bool ret; 197 198 spin_lock(&r->consumer_lock); 199 ret = __ptr_ring_empty(r); 200 spin_unlock(&r->consumer_lock); 201 202 return ret; 203 } 204 205 static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 206 { 207 bool ret; 208 209 spin_lock_irq(&r->consumer_lock); 210 ret = __ptr_ring_empty(r); 211 spin_unlock_irq(&r->consumer_lock); 212 213 return ret; 214 } 215 216 static inline bool ptr_ring_empty_any(struct ptr_ring *r) 217 { 218 unsigned long flags; 219 bool ret; 220 221 spin_lock_irqsave(&r->consumer_lock, flags); 222 ret = __ptr_ring_empty(r); 223 spin_unlock_irqrestore(&r->consumer_lock, flags); 224 225 return ret; 226 } 227 228 static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 229 { 230 bool ret; 231 232 spin_lock_bh(&r->consumer_lock); 233 ret = __ptr_ring_empty(r); 234 spin_unlock_bh(&r->consumer_lock); 235 236 return ret; 237 } 238 239 /* Must only be called after __ptr_ring_peek returned !NULL */ 240 static inline void __ptr_ring_discard_one(struct ptr_ring *r) 241 { 242 /* Fundamentally, what we want to do is update consumer 243 * index and zero out the entry so producer can reuse it. 244 * Doing it naively at each consume would be as simple as: 245 * r->queue[r->consumer++] = NULL; 246 * if (unlikely(r->consumer >= r->size)) 247 * r->consumer = 0; 248 * but that is suboptimal when the ring is full as producer is writing 249 * out new entries in the same cache line. Defer these updates until a 250 * batch of entries has been consumed. 251 */ 252 int head = r->consumer_head++; 253 254 /* Once we have processed enough entries invalidate them in 255 * the ring all at once so producer can reuse their space in the ring. 256 * We also do this when we reach end of the ring - not mandatory 257 * but helps keep the implementation simple. 258 */ 259 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || 260 r->consumer_head >= r->size)) { 261 /* Zero out entries in the reverse order: this way we touch the 262 * cache line that producer might currently be reading the last; 263 * producer won't make progress and touch other cache lines 264 * besides the first one until we write out all entries. 265 */ 266 while (likely(head >= r->consumer_tail)) 267 r->queue[head--] = NULL; 268 r->consumer_tail = r->consumer_head; 269 } 270 if (unlikely(r->consumer_head >= r->size)) { 271 r->consumer_head = 0; 272 r->consumer_tail = 0; 273 } 274 } 275 276 static inline void *__ptr_ring_consume(struct ptr_ring *r) 277 { 278 void *ptr; 279 280 ptr = __ptr_ring_peek(r); 281 if (ptr) 282 __ptr_ring_discard_one(r); 283 284 /* Make sure anyone accessing data through the pointer is up to date. */ 285 /* Pairs with smp_wmb in __ptr_ring_produce. */ 286 smp_read_barrier_depends(); 287 return ptr; 288 } 289 290 static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 291 void **array, int n) 292 { 293 void *ptr; 294 int i; 295 296 for (i = 0; i < n; i++) { 297 ptr = __ptr_ring_consume(r); 298 if (!ptr) 299 break; 300 array[i] = ptr; 301 } 302 303 return i; 304 } 305 306 /* 307 * Note: resize (below) nests producer lock within consumer lock, so if you 308 * call this in interrupt or BH context, you must disable interrupts/BH when 309 * producing. 310 */ 311 static inline void *ptr_ring_consume(struct ptr_ring *r) 312 { 313 void *ptr; 314 315 spin_lock(&r->consumer_lock); 316 ptr = __ptr_ring_consume(r); 317 spin_unlock(&r->consumer_lock); 318 319 return ptr; 320 } 321 322 static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 323 { 324 void *ptr; 325 326 spin_lock_irq(&r->consumer_lock); 327 ptr = __ptr_ring_consume(r); 328 spin_unlock_irq(&r->consumer_lock); 329 330 return ptr; 331 } 332 333 static inline void *ptr_ring_consume_any(struct ptr_ring *r) 334 { 335 unsigned long flags; 336 void *ptr; 337 338 spin_lock_irqsave(&r->consumer_lock, flags); 339 ptr = __ptr_ring_consume(r); 340 spin_unlock_irqrestore(&r->consumer_lock, flags); 341 342 return ptr; 343 } 344 345 static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 346 { 347 void *ptr; 348 349 spin_lock_bh(&r->consumer_lock); 350 ptr = __ptr_ring_consume(r); 351 spin_unlock_bh(&r->consumer_lock); 352 353 return ptr; 354 } 355 356 static inline int ptr_ring_consume_batched(struct ptr_ring *r, 357 void **array, int n) 358 { 359 int ret; 360 361 spin_lock(&r->consumer_lock); 362 ret = __ptr_ring_consume_batched(r, array, n); 363 spin_unlock(&r->consumer_lock); 364 365 return ret; 366 } 367 368 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 369 void **array, int n) 370 { 371 int ret; 372 373 spin_lock_irq(&r->consumer_lock); 374 ret = __ptr_ring_consume_batched(r, array, n); 375 spin_unlock_irq(&r->consumer_lock); 376 377 return ret; 378 } 379 380 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 381 void **array, int n) 382 { 383 unsigned long flags; 384 int ret; 385 386 spin_lock_irqsave(&r->consumer_lock, flags); 387 ret = __ptr_ring_consume_batched(r, array, n); 388 spin_unlock_irqrestore(&r->consumer_lock, flags); 389 390 return ret; 391 } 392 393 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 394 void **array, int n) 395 { 396 int ret; 397 398 spin_lock_bh(&r->consumer_lock); 399 ret = __ptr_ring_consume_batched(r, array, n); 400 spin_unlock_bh(&r->consumer_lock); 401 402 return ret; 403 } 404 405 /* Cast to structure type and call a function without discarding from FIFO. 406 * Function must return a value. 407 * Callers must take consumer_lock. 408 */ 409 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 410 411 #define PTR_RING_PEEK_CALL(r, f) ({ \ 412 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 413 \ 414 spin_lock(&(r)->consumer_lock); \ 415 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 416 spin_unlock(&(r)->consumer_lock); \ 417 __PTR_RING_PEEK_CALL_v; \ 418 }) 419 420 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 421 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 422 \ 423 spin_lock_irq(&(r)->consumer_lock); \ 424 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 425 spin_unlock_irq(&(r)->consumer_lock); \ 426 __PTR_RING_PEEK_CALL_v; \ 427 }) 428 429 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 430 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 431 \ 432 spin_lock_bh(&(r)->consumer_lock); \ 433 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 434 spin_unlock_bh(&(r)->consumer_lock); \ 435 __PTR_RING_PEEK_CALL_v; \ 436 }) 437 438 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 439 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 440 unsigned long __PTR_RING_PEEK_CALL_f;\ 441 \ 442 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 443 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 444 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 445 __PTR_RING_PEEK_CALL_v; \ 446 }) 447 448 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 449 { 450 /* Allocate an extra dummy element at end of ring to avoid consumer head 451 * or produce head access past the end of the array. Possible when 452 * producer/consumer operations and __ptr_ring_peek operations run in 453 * parallel. 454 */ 455 return kcalloc(size + 1, sizeof(void *), gfp); 456 } 457 458 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 459 { 460 r->size = size; 461 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 462 /* We need to set batch at least to 1 to make logic 463 * in __ptr_ring_discard_one work correctly. 464 * Batching too much (because ring is small) would cause a lot of 465 * burstiness. Needs tuning, for now disable batching. 466 */ 467 if (r->batch > r->size / 2 || !r->batch) 468 r->batch = 1; 469 } 470 471 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 472 { 473 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 474 if (!r->queue) 475 return -ENOMEM; 476 477 __ptr_ring_set_size(r, size); 478 r->producer = r->consumer_head = r->consumer_tail = 0; 479 spin_lock_init(&r->producer_lock); 480 spin_lock_init(&r->consumer_lock); 481 482 return 0; 483 } 484 485 /* 486 * Return entries into ring. Destroy entries that don't fit. 487 * 488 * Note: this is expected to be a rare slow path operation. 489 * 490 * Note: producer lock is nested within consumer lock, so if you 491 * resize you must make sure all uses nest correctly. 492 * In particular if you consume ring in interrupt or BH context, you must 493 * disable interrupts/BH when doing so. 494 */ 495 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 496 void (*destroy)(void *)) 497 { 498 unsigned long flags; 499 int head; 500 501 spin_lock_irqsave(&r->consumer_lock, flags); 502 spin_lock(&r->producer_lock); 503 504 if (!r->size) 505 goto done; 506 507 /* 508 * Clean out buffered entries (for simplicity). This way following code 509 * can test entries for NULL and if not assume they are valid. 510 */ 511 head = r->consumer_head - 1; 512 while (likely(head >= r->consumer_tail)) 513 r->queue[head--] = NULL; 514 r->consumer_tail = r->consumer_head; 515 516 /* 517 * Go over entries in batch, start moving head back and copy entries. 518 * Stop when we run into previously unconsumed entries. 519 */ 520 while (n) { 521 head = r->consumer_head - 1; 522 if (head < 0) 523 head = r->size - 1; 524 if (r->queue[head]) { 525 /* This batch entry will have to be destroyed. */ 526 goto done; 527 } 528 r->queue[head] = batch[--n]; 529 r->consumer_tail = r->consumer_head = head; 530 } 531 532 done: 533 /* Destroy all entries left in the batch. */ 534 while (n) 535 destroy(batch[--n]); 536 spin_unlock(&r->producer_lock); 537 spin_unlock_irqrestore(&r->consumer_lock, flags); 538 } 539 540 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 541 int size, gfp_t gfp, 542 void (*destroy)(void *)) 543 { 544 int producer = 0; 545 void **old; 546 void *ptr; 547 548 while ((ptr = __ptr_ring_consume(r))) 549 if (producer < size) 550 queue[producer++] = ptr; 551 else if (destroy) 552 destroy(ptr); 553 554 __ptr_ring_set_size(r, size); 555 r->producer = producer; 556 r->consumer_head = 0; 557 r->consumer_tail = 0; 558 old = r->queue; 559 r->queue = queue; 560 561 return old; 562 } 563 564 /* 565 * Note: producer lock is nested within consumer lock, so if you 566 * resize you must make sure all uses nest correctly. 567 * In particular if you consume ring in interrupt or BH context, you must 568 * disable interrupts/BH when doing so. 569 */ 570 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 571 void (*destroy)(void *)) 572 { 573 unsigned long flags; 574 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 575 void **old; 576 577 if (!queue) 578 return -ENOMEM; 579 580 spin_lock_irqsave(&(r)->consumer_lock, flags); 581 spin_lock(&(r)->producer_lock); 582 583 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 584 585 spin_unlock(&(r)->producer_lock); 586 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 587 588 kfree(old); 589 590 return 0; 591 } 592 593 /* 594 * Note: producer lock is nested within consumer lock, so if you 595 * resize you must make sure all uses nest correctly. 596 * In particular if you consume ring in interrupt or BH context, you must 597 * disable interrupts/BH when doing so. 598 */ 599 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 600 unsigned int nrings, 601 int size, 602 gfp_t gfp, void (*destroy)(void *)) 603 { 604 unsigned long flags; 605 void ***queues; 606 int i; 607 608 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 609 if (!queues) 610 goto noqueues; 611 612 for (i = 0; i < nrings; ++i) { 613 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 614 if (!queues[i]) 615 goto nomem; 616 } 617 618 for (i = 0; i < nrings; ++i) { 619 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 620 spin_lock(&(rings[i])->producer_lock); 621 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 622 size, gfp, destroy); 623 spin_unlock(&(rings[i])->producer_lock); 624 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 625 } 626 627 for (i = 0; i < nrings; ++i) 628 kfree(queues[i]); 629 630 kfree(queues); 631 632 return 0; 633 634 nomem: 635 while (--i >= 0) 636 kfree(queues[i]); 637 638 kfree(queues); 639 640 noqueues: 641 return -ENOMEM; 642 } 643 644 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 645 { 646 void *ptr; 647 648 if (destroy) 649 while ((ptr = ptr_ring_consume(r))) 650 destroy(ptr); 651 kfree(r->queue); 652 } 653 654 #endif /* _LINUX_PTR_RING_H */ 655