xref: /linux-6.15/include/linux/ptr_ring.h (revision e00a844a)
1 /*
2  *	Definitions for the 'struct ptr_ring' datastructure.
3  *
4  *	Author:
5  *		Michael S. Tsirkin <[email protected]>
6  *
7  *	Copyright (C) 2016 Red Hat, Inc.
8  *
9  *	This program is free software; you can redistribute it and/or modify it
10  *	under the terms of the GNU General Public License as published by the
11  *	Free Software Foundation; either version 2 of the License, or (at your
12  *	option) any later version.
13  *
14  *	This is a limited-size FIFO maintaining pointers in FIFO order, with
15  *	one CPU producing entries and another consuming entries from a FIFO.
16  *
17  *	This implementation tries to minimize cache-contention when there is a
18  *	single producer and a single consumer CPU.
19  */
20 
21 #ifndef _LINUX_PTR_RING_H
22 #define _LINUX_PTR_RING_H 1
23 
24 #ifdef __KERNEL__
25 #include <linux/spinlock.h>
26 #include <linux/cache.h>
27 #include <linux/types.h>
28 #include <linux/compiler.h>
29 #include <linux/cache.h>
30 #include <linux/slab.h>
31 #include <asm/errno.h>
32 #endif
33 
34 struct ptr_ring {
35 	int producer ____cacheline_aligned_in_smp;
36 	spinlock_t producer_lock;
37 	int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
38 	int consumer_tail; /* next entry to invalidate */
39 	spinlock_t consumer_lock;
40 	/* Shared consumer/producer data */
41 	/* Read-only by both the producer and the consumer */
42 	int size ____cacheline_aligned_in_smp; /* max entries in queue */
43 	int batch; /* number of entries to consume in a batch */
44 	void **queue;
45 };
46 
47 /* Note: callers invoking this in a loop must use a compiler barrier,
48  * for example cpu_relax().  If ring is ever resized, callers must hold
49  * producer_lock - see e.g. ptr_ring_full.  Otherwise, if callers don't hold
50  * producer_lock, the next call to __ptr_ring_produce may fail.
51  */
52 static inline bool __ptr_ring_full(struct ptr_ring *r)
53 {
54 	return r->queue[r->producer];
55 }
56 
57 static inline bool ptr_ring_full(struct ptr_ring *r)
58 {
59 	bool ret;
60 
61 	spin_lock(&r->producer_lock);
62 	ret = __ptr_ring_full(r);
63 	spin_unlock(&r->producer_lock);
64 
65 	return ret;
66 }
67 
68 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
69 {
70 	bool ret;
71 
72 	spin_lock_irq(&r->producer_lock);
73 	ret = __ptr_ring_full(r);
74 	spin_unlock_irq(&r->producer_lock);
75 
76 	return ret;
77 }
78 
79 static inline bool ptr_ring_full_any(struct ptr_ring *r)
80 {
81 	unsigned long flags;
82 	bool ret;
83 
84 	spin_lock_irqsave(&r->producer_lock, flags);
85 	ret = __ptr_ring_full(r);
86 	spin_unlock_irqrestore(&r->producer_lock, flags);
87 
88 	return ret;
89 }
90 
91 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
92 {
93 	bool ret;
94 
95 	spin_lock_bh(&r->producer_lock);
96 	ret = __ptr_ring_full(r);
97 	spin_unlock_bh(&r->producer_lock);
98 
99 	return ret;
100 }
101 
102 /* Note: callers invoking this in a loop must use a compiler barrier,
103  * for example cpu_relax(). Callers must hold producer_lock.
104  */
105 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
106 {
107 	if (unlikely(!r->size) || r->queue[r->producer])
108 		return -ENOSPC;
109 
110 	r->queue[r->producer++] = ptr;
111 	if (unlikely(r->producer >= r->size))
112 		r->producer = 0;
113 	return 0;
114 }
115 
116 /*
117  * Note: resize (below) nests producer lock within consumer lock, so if you
118  * consume in interrupt or BH context, you must disable interrupts/BH when
119  * calling this.
120  */
121 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
122 {
123 	int ret;
124 
125 	spin_lock(&r->producer_lock);
126 	ret = __ptr_ring_produce(r, ptr);
127 	spin_unlock(&r->producer_lock);
128 
129 	return ret;
130 }
131 
132 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
133 {
134 	int ret;
135 
136 	spin_lock_irq(&r->producer_lock);
137 	ret = __ptr_ring_produce(r, ptr);
138 	spin_unlock_irq(&r->producer_lock);
139 
140 	return ret;
141 }
142 
143 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
144 {
145 	unsigned long flags;
146 	int ret;
147 
148 	spin_lock_irqsave(&r->producer_lock, flags);
149 	ret = __ptr_ring_produce(r, ptr);
150 	spin_unlock_irqrestore(&r->producer_lock, flags);
151 
152 	return ret;
153 }
154 
155 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
156 {
157 	int ret;
158 
159 	spin_lock_bh(&r->producer_lock);
160 	ret = __ptr_ring_produce(r, ptr);
161 	spin_unlock_bh(&r->producer_lock);
162 
163 	return ret;
164 }
165 
166 /* Note: callers invoking this in a loop must use a compiler barrier,
167  * for example cpu_relax(). Callers must take consumer_lock
168  * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
169  * If ring is never resized, and if the pointer is merely
170  * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.
171  */
172 static inline void *__ptr_ring_peek(struct ptr_ring *r)
173 {
174 	if (likely(r->size))
175 		return r->queue[r->consumer_head];
176 	return NULL;
177 }
178 
179 /* Note: callers invoking this in a loop must use a compiler barrier,
180  * for example cpu_relax(). Callers must take consumer_lock
181  * if the ring is ever resized - see e.g. ptr_ring_empty.
182  */
183 static inline bool __ptr_ring_empty(struct ptr_ring *r)
184 {
185 	return !__ptr_ring_peek(r);
186 }
187 
188 static inline bool ptr_ring_empty(struct ptr_ring *r)
189 {
190 	bool ret;
191 
192 	spin_lock(&r->consumer_lock);
193 	ret = __ptr_ring_empty(r);
194 	spin_unlock(&r->consumer_lock);
195 
196 	return ret;
197 }
198 
199 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
200 {
201 	bool ret;
202 
203 	spin_lock_irq(&r->consumer_lock);
204 	ret = __ptr_ring_empty(r);
205 	spin_unlock_irq(&r->consumer_lock);
206 
207 	return ret;
208 }
209 
210 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
211 {
212 	unsigned long flags;
213 	bool ret;
214 
215 	spin_lock_irqsave(&r->consumer_lock, flags);
216 	ret = __ptr_ring_empty(r);
217 	spin_unlock_irqrestore(&r->consumer_lock, flags);
218 
219 	return ret;
220 }
221 
222 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
223 {
224 	bool ret;
225 
226 	spin_lock_bh(&r->consumer_lock);
227 	ret = __ptr_ring_empty(r);
228 	spin_unlock_bh(&r->consumer_lock);
229 
230 	return ret;
231 }
232 
233 /* Must only be called after __ptr_ring_peek returned !NULL */
234 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
235 {
236 	/* Fundamentally, what we want to do is update consumer
237 	 * index and zero out the entry so producer can reuse it.
238 	 * Doing it naively at each consume would be as simple as:
239 	 *       r->queue[r->consumer++] = NULL;
240 	 *       if (unlikely(r->consumer >= r->size))
241 	 *               r->consumer = 0;
242 	 * but that is suboptimal when the ring is full as producer is writing
243 	 * out new entries in the same cache line.  Defer these updates until a
244 	 * batch of entries has been consumed.
245 	 */
246 	int head = r->consumer_head++;
247 
248 	/* Once we have processed enough entries invalidate them in
249 	 * the ring all at once so producer can reuse their space in the ring.
250 	 * We also do this when we reach end of the ring - not mandatory
251 	 * but helps keep the implementation simple.
252 	 */
253 	if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
254 		     r->consumer_head >= r->size)) {
255 		/* Zero out entries in the reverse order: this way we touch the
256 		 * cache line that producer might currently be reading the last;
257 		 * producer won't make progress and touch other cache lines
258 		 * besides the first one until we write out all entries.
259 		 */
260 		while (likely(head >= r->consumer_tail))
261 			r->queue[head--] = NULL;
262 		r->consumer_tail = r->consumer_head;
263 	}
264 	if (unlikely(r->consumer_head >= r->size)) {
265 		r->consumer_head = 0;
266 		r->consumer_tail = 0;
267 	}
268 }
269 
270 static inline void *__ptr_ring_consume(struct ptr_ring *r)
271 {
272 	void *ptr;
273 
274 	ptr = __ptr_ring_peek(r);
275 	if (ptr)
276 		__ptr_ring_discard_one(r);
277 
278 	return ptr;
279 }
280 
281 static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
282 					     void **array, int n)
283 {
284 	void *ptr;
285 	int i;
286 
287 	for (i = 0; i < n; i++) {
288 		ptr = __ptr_ring_consume(r);
289 		if (!ptr)
290 			break;
291 		array[i] = ptr;
292 	}
293 
294 	return i;
295 }
296 
297 /*
298  * Note: resize (below) nests producer lock within consumer lock, so if you
299  * call this in interrupt or BH context, you must disable interrupts/BH when
300  * producing.
301  */
302 static inline void *ptr_ring_consume(struct ptr_ring *r)
303 {
304 	void *ptr;
305 
306 	spin_lock(&r->consumer_lock);
307 	ptr = __ptr_ring_consume(r);
308 	spin_unlock(&r->consumer_lock);
309 
310 	return ptr;
311 }
312 
313 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
314 {
315 	void *ptr;
316 
317 	spin_lock_irq(&r->consumer_lock);
318 	ptr = __ptr_ring_consume(r);
319 	spin_unlock_irq(&r->consumer_lock);
320 
321 	return ptr;
322 }
323 
324 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
325 {
326 	unsigned long flags;
327 	void *ptr;
328 
329 	spin_lock_irqsave(&r->consumer_lock, flags);
330 	ptr = __ptr_ring_consume(r);
331 	spin_unlock_irqrestore(&r->consumer_lock, flags);
332 
333 	return ptr;
334 }
335 
336 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
337 {
338 	void *ptr;
339 
340 	spin_lock_bh(&r->consumer_lock);
341 	ptr = __ptr_ring_consume(r);
342 	spin_unlock_bh(&r->consumer_lock);
343 
344 	return ptr;
345 }
346 
347 static inline int ptr_ring_consume_batched(struct ptr_ring *r,
348 					   void **array, int n)
349 {
350 	int ret;
351 
352 	spin_lock(&r->consumer_lock);
353 	ret = __ptr_ring_consume_batched(r, array, n);
354 	spin_unlock(&r->consumer_lock);
355 
356 	return ret;
357 }
358 
359 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
360 					       void **array, int n)
361 {
362 	int ret;
363 
364 	spin_lock_irq(&r->consumer_lock);
365 	ret = __ptr_ring_consume_batched(r, array, n);
366 	spin_unlock_irq(&r->consumer_lock);
367 
368 	return ret;
369 }
370 
371 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
372 					       void **array, int n)
373 {
374 	unsigned long flags;
375 	int ret;
376 
377 	spin_lock_irqsave(&r->consumer_lock, flags);
378 	ret = __ptr_ring_consume_batched(r, array, n);
379 	spin_unlock_irqrestore(&r->consumer_lock, flags);
380 
381 	return ret;
382 }
383 
384 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
385 					      void **array, int n)
386 {
387 	int ret;
388 
389 	spin_lock_bh(&r->consumer_lock);
390 	ret = __ptr_ring_consume_batched(r, array, n);
391 	spin_unlock_bh(&r->consumer_lock);
392 
393 	return ret;
394 }
395 
396 /* Cast to structure type and call a function without discarding from FIFO.
397  * Function must return a value.
398  * Callers must take consumer_lock.
399  */
400 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
401 
402 #define PTR_RING_PEEK_CALL(r, f) ({ \
403 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
404 	\
405 	spin_lock(&(r)->consumer_lock); \
406 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
407 	spin_unlock(&(r)->consumer_lock); \
408 	__PTR_RING_PEEK_CALL_v; \
409 })
410 
411 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
412 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
413 	\
414 	spin_lock_irq(&(r)->consumer_lock); \
415 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
416 	spin_unlock_irq(&(r)->consumer_lock); \
417 	__PTR_RING_PEEK_CALL_v; \
418 })
419 
420 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
421 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
422 	\
423 	spin_lock_bh(&(r)->consumer_lock); \
424 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
425 	spin_unlock_bh(&(r)->consumer_lock); \
426 	__PTR_RING_PEEK_CALL_v; \
427 })
428 
429 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
430 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
431 	unsigned long __PTR_RING_PEEK_CALL_f;\
432 	\
433 	spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
434 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
435 	spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
436 	__PTR_RING_PEEK_CALL_v; \
437 })
438 
439 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
440 {
441 	return kcalloc(size, sizeof(void *), gfp);
442 }
443 
444 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
445 {
446 	r->size = size;
447 	r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
448 	/* We need to set batch at least to 1 to make logic
449 	 * in __ptr_ring_discard_one work correctly.
450 	 * Batching too much (because ring is small) would cause a lot of
451 	 * burstiness. Needs tuning, for now disable batching.
452 	 */
453 	if (r->batch > r->size / 2 || !r->batch)
454 		r->batch = 1;
455 }
456 
457 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
458 {
459 	r->queue = __ptr_ring_init_queue_alloc(size, gfp);
460 	if (!r->queue)
461 		return -ENOMEM;
462 
463 	__ptr_ring_set_size(r, size);
464 	r->producer = r->consumer_head = r->consumer_tail = 0;
465 	spin_lock_init(&r->producer_lock);
466 	spin_lock_init(&r->consumer_lock);
467 
468 	return 0;
469 }
470 
471 /*
472  * Return entries into ring. Destroy entries that don't fit.
473  *
474  * Note: this is expected to be a rare slow path operation.
475  *
476  * Note: producer lock is nested within consumer lock, so if you
477  * resize you must make sure all uses nest correctly.
478  * In particular if you consume ring in interrupt or BH context, you must
479  * disable interrupts/BH when doing so.
480  */
481 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
482 				      void (*destroy)(void *))
483 {
484 	unsigned long flags;
485 	int head;
486 
487 	spin_lock_irqsave(&r->consumer_lock, flags);
488 	spin_lock(&r->producer_lock);
489 
490 	if (!r->size)
491 		goto done;
492 
493 	/*
494 	 * Clean out buffered entries (for simplicity). This way following code
495 	 * can test entries for NULL and if not assume they are valid.
496 	 */
497 	head = r->consumer_head - 1;
498 	while (likely(head >= r->consumer_tail))
499 		r->queue[head--] = NULL;
500 	r->consumer_tail = r->consumer_head;
501 
502 	/*
503 	 * Go over entries in batch, start moving head back and copy entries.
504 	 * Stop when we run into previously unconsumed entries.
505 	 */
506 	while (n) {
507 		head = r->consumer_head - 1;
508 		if (head < 0)
509 			head = r->size - 1;
510 		if (r->queue[head]) {
511 			/* This batch entry will have to be destroyed. */
512 			goto done;
513 		}
514 		r->queue[head] = batch[--n];
515 		r->consumer_tail = r->consumer_head = head;
516 	}
517 
518 done:
519 	/* Destroy all entries left in the batch. */
520 	while (n)
521 		destroy(batch[--n]);
522 	spin_unlock(&r->producer_lock);
523 	spin_unlock_irqrestore(&r->consumer_lock, flags);
524 }
525 
526 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
527 					   int size, gfp_t gfp,
528 					   void (*destroy)(void *))
529 {
530 	int producer = 0;
531 	void **old;
532 	void *ptr;
533 
534 	while ((ptr = __ptr_ring_consume(r)))
535 		if (producer < size)
536 			queue[producer++] = ptr;
537 		else if (destroy)
538 			destroy(ptr);
539 
540 	__ptr_ring_set_size(r, size);
541 	r->producer = producer;
542 	r->consumer_head = 0;
543 	r->consumer_tail = 0;
544 	old = r->queue;
545 	r->queue = queue;
546 
547 	return old;
548 }
549 
550 /*
551  * Note: producer lock is nested within consumer lock, so if you
552  * resize you must make sure all uses nest correctly.
553  * In particular if you consume ring in interrupt or BH context, you must
554  * disable interrupts/BH when doing so.
555  */
556 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
557 				  void (*destroy)(void *))
558 {
559 	unsigned long flags;
560 	void **queue = __ptr_ring_init_queue_alloc(size, gfp);
561 	void **old;
562 
563 	if (!queue)
564 		return -ENOMEM;
565 
566 	spin_lock_irqsave(&(r)->consumer_lock, flags);
567 	spin_lock(&(r)->producer_lock);
568 
569 	old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
570 
571 	spin_unlock(&(r)->producer_lock);
572 	spin_unlock_irqrestore(&(r)->consumer_lock, flags);
573 
574 	kfree(old);
575 
576 	return 0;
577 }
578 
579 /*
580  * Note: producer lock is nested within consumer lock, so if you
581  * resize you must make sure all uses nest correctly.
582  * In particular if you consume ring in interrupt or BH context, you must
583  * disable interrupts/BH when doing so.
584  */
585 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
586 					   unsigned int nrings,
587 					   int size,
588 					   gfp_t gfp, void (*destroy)(void *))
589 {
590 	unsigned long flags;
591 	void ***queues;
592 	int i;
593 
594 	queues = kmalloc_array(nrings, sizeof(*queues), gfp);
595 	if (!queues)
596 		goto noqueues;
597 
598 	for (i = 0; i < nrings; ++i) {
599 		queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
600 		if (!queues[i])
601 			goto nomem;
602 	}
603 
604 	for (i = 0; i < nrings; ++i) {
605 		spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
606 		spin_lock(&(rings[i])->producer_lock);
607 		queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
608 						  size, gfp, destroy);
609 		spin_unlock(&(rings[i])->producer_lock);
610 		spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
611 	}
612 
613 	for (i = 0; i < nrings; ++i)
614 		kfree(queues[i]);
615 
616 	kfree(queues);
617 
618 	return 0;
619 
620 nomem:
621 	while (--i >= 0)
622 		kfree(queues[i]);
623 
624 	kfree(queues);
625 
626 noqueues:
627 	return -ENOMEM;
628 }
629 
630 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
631 {
632 	void *ptr;
633 
634 	if (destroy)
635 		while ((ptr = ptr_ring_consume(r)))
636 			destroy(ptr);
637 	kfree(r->queue);
638 }
639 
640 #endif /* _LINUX_PTR_RING_H  */
641