1 /* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <[email protected]> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21 #ifndef _LINUX_PTR_RING_H 22 #define _LINUX_PTR_RING_H 1 23 24 #ifdef __KERNEL__ 25 #include <linux/spinlock.h> 26 #include <linux/cache.h> 27 #include <linux/types.h> 28 #include <linux/compiler.h> 29 #include <linux/cache.h> 30 #include <linux/slab.h> 31 #include <asm/errno.h> 32 #endif 33 34 struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45 }; 46 47 /* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). If ring is ever resized, callers must hold 49 * producer_lock - see e.g. ptr_ring_full. Otherwise, if callers don't hold 50 * producer_lock, the next call to __ptr_ring_produce may fail. 51 */ 52 static inline bool __ptr_ring_full(struct ptr_ring *r) 53 { 54 return r->queue[r->producer]; 55 } 56 57 static inline bool ptr_ring_full(struct ptr_ring *r) 58 { 59 bool ret; 60 61 spin_lock(&r->producer_lock); 62 ret = __ptr_ring_full(r); 63 spin_unlock(&r->producer_lock); 64 65 return ret; 66 } 67 68 static inline bool ptr_ring_full_irq(struct ptr_ring *r) 69 { 70 bool ret; 71 72 spin_lock_irq(&r->producer_lock); 73 ret = __ptr_ring_full(r); 74 spin_unlock_irq(&r->producer_lock); 75 76 return ret; 77 } 78 79 static inline bool ptr_ring_full_any(struct ptr_ring *r) 80 { 81 unsigned long flags; 82 bool ret; 83 84 spin_lock_irqsave(&r->producer_lock, flags); 85 ret = __ptr_ring_full(r); 86 spin_unlock_irqrestore(&r->producer_lock, flags); 87 88 return ret; 89 } 90 91 static inline bool ptr_ring_full_bh(struct ptr_ring *r) 92 { 93 bool ret; 94 95 spin_lock_bh(&r->producer_lock); 96 ret = __ptr_ring_full(r); 97 spin_unlock_bh(&r->producer_lock); 98 99 return ret; 100 } 101 102 /* Note: callers invoking this in a loop must use a compiler barrier, 103 * for example cpu_relax(). Callers must hold producer_lock. 104 * Callers are responsible for making sure pointer that is being queued 105 * points to a valid data. 106 */ 107 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 108 { 109 if (unlikely(!r->size) || r->queue[r->producer]) 110 return -ENOSPC; 111 112 /* Make sure the pointer we are storing points to a valid data. */ 113 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 114 smp_wmb(); 115 116 r->queue[r->producer++] = ptr; 117 if (unlikely(r->producer >= r->size)) 118 r->producer = 0; 119 return 0; 120 } 121 122 /* 123 * Note: resize (below) nests producer lock within consumer lock, so if you 124 * consume in interrupt or BH context, you must disable interrupts/BH when 125 * calling this. 126 */ 127 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 128 { 129 int ret; 130 131 spin_lock(&r->producer_lock); 132 ret = __ptr_ring_produce(r, ptr); 133 spin_unlock(&r->producer_lock); 134 135 return ret; 136 } 137 138 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 139 { 140 int ret; 141 142 spin_lock_irq(&r->producer_lock); 143 ret = __ptr_ring_produce(r, ptr); 144 spin_unlock_irq(&r->producer_lock); 145 146 return ret; 147 } 148 149 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 150 { 151 unsigned long flags; 152 int ret; 153 154 spin_lock_irqsave(&r->producer_lock, flags); 155 ret = __ptr_ring_produce(r, ptr); 156 spin_unlock_irqrestore(&r->producer_lock, flags); 157 158 return ret; 159 } 160 161 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 162 { 163 int ret; 164 165 spin_lock_bh(&r->producer_lock); 166 ret = __ptr_ring_produce(r, ptr); 167 spin_unlock_bh(&r->producer_lock); 168 169 return ret; 170 } 171 172 /* Note: callers invoking this in a loop must use a compiler barrier, 173 * for example cpu_relax(). Callers must take consumer_lock 174 * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL. 175 * If ring is never resized, and if the pointer is merely 176 * tested, there's no need to take the lock - see e.g. __ptr_ring_empty. 177 */ 178 static inline void *__ptr_ring_peek(struct ptr_ring *r) 179 { 180 if (likely(r->size)) 181 return r->queue[r->consumer_head]; 182 return NULL; 183 } 184 185 /* Note: callers invoking this in a loop must use a compiler barrier, 186 * for example cpu_relax(). Callers must take consumer_lock 187 * if the ring is ever resized - see e.g. ptr_ring_empty. 188 */ 189 static inline bool __ptr_ring_empty(struct ptr_ring *r) 190 { 191 return !__ptr_ring_peek(r); 192 } 193 194 static inline bool ptr_ring_empty(struct ptr_ring *r) 195 { 196 bool ret; 197 198 spin_lock(&r->consumer_lock); 199 ret = __ptr_ring_empty(r); 200 spin_unlock(&r->consumer_lock); 201 202 return ret; 203 } 204 205 static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 206 { 207 bool ret; 208 209 spin_lock_irq(&r->consumer_lock); 210 ret = __ptr_ring_empty(r); 211 spin_unlock_irq(&r->consumer_lock); 212 213 return ret; 214 } 215 216 static inline bool ptr_ring_empty_any(struct ptr_ring *r) 217 { 218 unsigned long flags; 219 bool ret; 220 221 spin_lock_irqsave(&r->consumer_lock, flags); 222 ret = __ptr_ring_empty(r); 223 spin_unlock_irqrestore(&r->consumer_lock, flags); 224 225 return ret; 226 } 227 228 static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 229 { 230 bool ret; 231 232 spin_lock_bh(&r->consumer_lock); 233 ret = __ptr_ring_empty(r); 234 spin_unlock_bh(&r->consumer_lock); 235 236 return ret; 237 } 238 239 /* Must only be called after __ptr_ring_peek returned !NULL */ 240 static inline void __ptr_ring_discard_one(struct ptr_ring *r) 241 { 242 /* Fundamentally, what we want to do is update consumer 243 * index and zero out the entry so producer can reuse it. 244 * Doing it naively at each consume would be as simple as: 245 * r->queue[r->consumer++] = NULL; 246 * if (unlikely(r->consumer >= r->size)) 247 * r->consumer = 0; 248 * but that is suboptimal when the ring is full as producer is writing 249 * out new entries in the same cache line. Defer these updates until a 250 * batch of entries has been consumed. 251 */ 252 int head = r->consumer_head++; 253 254 /* Once we have processed enough entries invalidate them in 255 * the ring all at once so producer can reuse their space in the ring. 256 * We also do this when we reach end of the ring - not mandatory 257 * but helps keep the implementation simple. 258 */ 259 if (unlikely(r->consumer_head - r->consumer_tail >= r->batch || 260 r->consumer_head >= r->size)) { 261 /* Zero out entries in the reverse order: this way we touch the 262 * cache line that producer might currently be reading the last; 263 * producer won't make progress and touch other cache lines 264 * besides the first one until we write out all entries. 265 */ 266 while (likely(head >= r->consumer_tail)) 267 r->queue[head--] = NULL; 268 r->consumer_tail = r->consumer_head; 269 } 270 if (unlikely(r->consumer_head >= r->size)) { 271 r->consumer_head = 0; 272 r->consumer_tail = 0; 273 } 274 } 275 276 static inline void *__ptr_ring_consume(struct ptr_ring *r) 277 { 278 void *ptr; 279 280 ptr = __ptr_ring_peek(r); 281 if (ptr) 282 __ptr_ring_discard_one(r); 283 284 /* Make sure anyone accessing data through the pointer is up to date. */ 285 /* Pairs with smp_wmb in __ptr_ring_produce. */ 286 smp_read_barrier_depends(); 287 return ptr; 288 } 289 290 static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 291 void **array, int n) 292 { 293 void *ptr; 294 int i; 295 296 for (i = 0; i < n; i++) { 297 ptr = __ptr_ring_consume(r); 298 if (!ptr) 299 break; 300 array[i] = ptr; 301 } 302 303 return i; 304 } 305 306 /* 307 * Note: resize (below) nests producer lock within consumer lock, so if you 308 * call this in interrupt or BH context, you must disable interrupts/BH when 309 * producing. 310 */ 311 static inline void *ptr_ring_consume(struct ptr_ring *r) 312 { 313 void *ptr; 314 315 spin_lock(&r->consumer_lock); 316 ptr = __ptr_ring_consume(r); 317 spin_unlock(&r->consumer_lock); 318 319 return ptr; 320 } 321 322 static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 323 { 324 void *ptr; 325 326 spin_lock_irq(&r->consumer_lock); 327 ptr = __ptr_ring_consume(r); 328 spin_unlock_irq(&r->consumer_lock); 329 330 return ptr; 331 } 332 333 static inline void *ptr_ring_consume_any(struct ptr_ring *r) 334 { 335 unsigned long flags; 336 void *ptr; 337 338 spin_lock_irqsave(&r->consumer_lock, flags); 339 ptr = __ptr_ring_consume(r); 340 spin_unlock_irqrestore(&r->consumer_lock, flags); 341 342 return ptr; 343 } 344 345 static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 346 { 347 void *ptr; 348 349 spin_lock_bh(&r->consumer_lock); 350 ptr = __ptr_ring_consume(r); 351 spin_unlock_bh(&r->consumer_lock); 352 353 return ptr; 354 } 355 356 static inline int ptr_ring_consume_batched(struct ptr_ring *r, 357 void **array, int n) 358 { 359 int ret; 360 361 spin_lock(&r->consumer_lock); 362 ret = __ptr_ring_consume_batched(r, array, n); 363 spin_unlock(&r->consumer_lock); 364 365 return ret; 366 } 367 368 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 369 void **array, int n) 370 { 371 int ret; 372 373 spin_lock_irq(&r->consumer_lock); 374 ret = __ptr_ring_consume_batched(r, array, n); 375 spin_unlock_irq(&r->consumer_lock); 376 377 return ret; 378 } 379 380 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 381 void **array, int n) 382 { 383 unsigned long flags; 384 int ret; 385 386 spin_lock_irqsave(&r->consumer_lock, flags); 387 ret = __ptr_ring_consume_batched(r, array, n); 388 spin_unlock_irqrestore(&r->consumer_lock, flags); 389 390 return ret; 391 } 392 393 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 394 void **array, int n) 395 { 396 int ret; 397 398 spin_lock_bh(&r->consumer_lock); 399 ret = __ptr_ring_consume_batched(r, array, n); 400 spin_unlock_bh(&r->consumer_lock); 401 402 return ret; 403 } 404 405 /* Cast to structure type and call a function without discarding from FIFO. 406 * Function must return a value. 407 * Callers must take consumer_lock. 408 */ 409 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 410 411 #define PTR_RING_PEEK_CALL(r, f) ({ \ 412 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 413 \ 414 spin_lock(&(r)->consumer_lock); \ 415 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 416 spin_unlock(&(r)->consumer_lock); \ 417 __PTR_RING_PEEK_CALL_v; \ 418 }) 419 420 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 421 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 422 \ 423 spin_lock_irq(&(r)->consumer_lock); \ 424 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 425 spin_unlock_irq(&(r)->consumer_lock); \ 426 __PTR_RING_PEEK_CALL_v; \ 427 }) 428 429 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 430 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 431 \ 432 spin_lock_bh(&(r)->consumer_lock); \ 433 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 434 spin_unlock_bh(&(r)->consumer_lock); \ 435 __PTR_RING_PEEK_CALL_v; \ 436 }) 437 438 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 439 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 440 unsigned long __PTR_RING_PEEK_CALL_f;\ 441 \ 442 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 443 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 444 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 445 __PTR_RING_PEEK_CALL_v; \ 446 }) 447 448 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 449 { 450 return kcalloc(size, sizeof(void *), gfp); 451 } 452 453 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 454 { 455 r->size = size; 456 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 457 /* We need to set batch at least to 1 to make logic 458 * in __ptr_ring_discard_one work correctly. 459 * Batching too much (because ring is small) would cause a lot of 460 * burstiness. Needs tuning, for now disable batching. 461 */ 462 if (r->batch > r->size / 2 || !r->batch) 463 r->batch = 1; 464 } 465 466 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 467 { 468 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 469 if (!r->queue) 470 return -ENOMEM; 471 472 __ptr_ring_set_size(r, size); 473 r->producer = r->consumer_head = r->consumer_tail = 0; 474 spin_lock_init(&r->producer_lock); 475 spin_lock_init(&r->consumer_lock); 476 477 return 0; 478 } 479 480 /* 481 * Return entries into ring. Destroy entries that don't fit. 482 * 483 * Note: this is expected to be a rare slow path operation. 484 * 485 * Note: producer lock is nested within consumer lock, so if you 486 * resize you must make sure all uses nest correctly. 487 * In particular if you consume ring in interrupt or BH context, you must 488 * disable interrupts/BH when doing so. 489 */ 490 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 491 void (*destroy)(void *)) 492 { 493 unsigned long flags; 494 int head; 495 496 spin_lock_irqsave(&r->consumer_lock, flags); 497 spin_lock(&r->producer_lock); 498 499 if (!r->size) 500 goto done; 501 502 /* 503 * Clean out buffered entries (for simplicity). This way following code 504 * can test entries for NULL and if not assume they are valid. 505 */ 506 head = r->consumer_head - 1; 507 while (likely(head >= r->consumer_tail)) 508 r->queue[head--] = NULL; 509 r->consumer_tail = r->consumer_head; 510 511 /* 512 * Go over entries in batch, start moving head back and copy entries. 513 * Stop when we run into previously unconsumed entries. 514 */ 515 while (n) { 516 head = r->consumer_head - 1; 517 if (head < 0) 518 head = r->size - 1; 519 if (r->queue[head]) { 520 /* This batch entry will have to be destroyed. */ 521 goto done; 522 } 523 r->queue[head] = batch[--n]; 524 r->consumer_tail = r->consumer_head = head; 525 } 526 527 done: 528 /* Destroy all entries left in the batch. */ 529 while (n) 530 destroy(batch[--n]); 531 spin_unlock(&r->producer_lock); 532 spin_unlock_irqrestore(&r->consumer_lock, flags); 533 } 534 535 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 536 int size, gfp_t gfp, 537 void (*destroy)(void *)) 538 { 539 int producer = 0; 540 void **old; 541 void *ptr; 542 543 while ((ptr = __ptr_ring_consume(r))) 544 if (producer < size) 545 queue[producer++] = ptr; 546 else if (destroy) 547 destroy(ptr); 548 549 __ptr_ring_set_size(r, size); 550 r->producer = producer; 551 r->consumer_head = 0; 552 r->consumer_tail = 0; 553 old = r->queue; 554 r->queue = queue; 555 556 return old; 557 } 558 559 /* 560 * Note: producer lock is nested within consumer lock, so if you 561 * resize you must make sure all uses nest correctly. 562 * In particular if you consume ring in interrupt or BH context, you must 563 * disable interrupts/BH when doing so. 564 */ 565 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 566 void (*destroy)(void *)) 567 { 568 unsigned long flags; 569 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 570 void **old; 571 572 if (!queue) 573 return -ENOMEM; 574 575 spin_lock_irqsave(&(r)->consumer_lock, flags); 576 spin_lock(&(r)->producer_lock); 577 578 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 579 580 spin_unlock(&(r)->producer_lock); 581 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 582 583 kfree(old); 584 585 return 0; 586 } 587 588 /* 589 * Note: producer lock is nested within consumer lock, so if you 590 * resize you must make sure all uses nest correctly. 591 * In particular if you consume ring in interrupt or BH context, you must 592 * disable interrupts/BH when doing so. 593 */ 594 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 595 unsigned int nrings, 596 int size, 597 gfp_t gfp, void (*destroy)(void *)) 598 { 599 unsigned long flags; 600 void ***queues; 601 int i; 602 603 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 604 if (!queues) 605 goto noqueues; 606 607 for (i = 0; i < nrings; ++i) { 608 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 609 if (!queues[i]) 610 goto nomem; 611 } 612 613 for (i = 0; i < nrings; ++i) { 614 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 615 spin_lock(&(rings[i])->producer_lock); 616 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 617 size, gfp, destroy); 618 spin_unlock(&(rings[i])->producer_lock); 619 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 620 } 621 622 for (i = 0; i < nrings; ++i) 623 kfree(queues[i]); 624 625 kfree(queues); 626 627 return 0; 628 629 nomem: 630 while (--i >= 0) 631 kfree(queues[i]); 632 633 kfree(queues); 634 635 noqueues: 636 return -ENOMEM; 637 } 638 639 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 640 { 641 void *ptr; 642 643 if (destroy) 644 while ((ptr = ptr_ring_consume(r))) 645 destroy(ptr); 646 kfree(r->queue); 647 } 648 649 #endif /* _LINUX_PTR_RING_H */ 650