1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2017 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy [email protected]
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_GENERIC_H_
11 #define _RTE_RING_GENERIC_H_
12 
13 static __rte_always_inline void
update_tail(struct rte_ring_headtail * ht,uint32_t old_val,uint32_t new_val,uint32_t single,uint32_t enqueue)14 update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
15 		uint32_t single, uint32_t enqueue)
16 {
17 	if (enqueue)
18 		rte_smp_wmb();
19 	else
20 		rte_smp_rmb();
21 	/*
22 	 * If there are other enqueues/dequeues in progress that preceded us,
23 	 * we need to wait for them to complete
24 	 */
25 	if (!single)
26 		while (unlikely(ht->tail != old_val))
27 			rte_pause();
28 
29 	ht->tail = new_val;
30 }
31 
32 /**
33  * @internal This function updates the producer head for enqueue
34  *
35  * @param r
36  *   A pointer to the ring structure
37  * @param is_sp
38  *   Indicates whether multi-producer path is needed or not
39  * @param n
40  *   The number of elements we will want to enqueue, i.e. how far should the
41  *   head be moved
42  * @param behavior
43  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
44  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
45  * @param old_head
46  *   Returns head value as it was before the move, i.e. where enqueue starts
47  * @param new_head
48  *   Returns the current/new head value i.e. where enqueue finishes
49  * @param free_entries
50  *   Returns the amount of free space in the ring BEFORE head was moved
51  * @return
52  *   Actual number of objects enqueued.
53  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
54  */
55 static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring * r,unsigned int is_sp,unsigned int n,enum rte_ring_queue_behavior behavior,uint32_t * old_head,uint32_t * new_head,uint32_t * free_entries)56 __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
57 		unsigned int n, enum rte_ring_queue_behavior behavior,
58 		uint32_t *old_head, uint32_t *new_head,
59 		uint32_t *free_entries)
60 {
61 	const uint32_t capacity = r->capacity;
62 	unsigned int max = n;
63 	int success;
64 
65 	do {
66 		/* Reset n to the initial burst count */
67 		n = max;
68 
69 		*old_head = r->prod.head;
70 
71 		/* add rmb barrier to avoid load/load reorder in weak
72 		 * memory model. It is noop on x86
73 		 */
74 		rte_smp_rmb();
75 
76 		/*
77 		 *  The subtraction is done between two unsigned 32bits value
78 		 * (the result is always modulo 32 bits even if we have
79 		 * *old_head > cons_tail). So 'free_entries' is always between 0
80 		 * and capacity (which is < size).
81 		 */
82 		*free_entries = (capacity + r->cons.tail - *old_head);
83 
84 		/* check that we have enough room in ring */
85 		if (unlikely(n > *free_entries))
86 			n = (behavior == RTE_RING_QUEUE_FIXED) ?
87 					0 : *free_entries;
88 
89 		if (n == 0)
90 			return 0;
91 
92 		*new_head = *old_head + n;
93 		if (is_sp)
94 			r->prod.head = *new_head, success = 1;
95 		else
96 			success = rte_atomic32_cmpset(&r->prod.head,
97 					*old_head, *new_head);
98 	} while (unlikely(success == 0));
99 	return n;
100 }
101 
102 /**
103  * @internal This function updates the consumer head for dequeue
104  *
105  * @param r
106  *   A pointer to the ring structure
107  * @param is_sc
108  *   Indicates whether multi-consumer path is needed or not
109  * @param n
110  *   The number of elements we will want to enqueue, i.e. how far should the
111  *   head be moved
112  * @param behavior
113  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
114  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
115  * @param old_head
116  *   Returns head value as it was before the move, i.e. where dequeue starts
117  * @param new_head
118  *   Returns the current/new head value i.e. where dequeue finishes
119  * @param entries
120  *   Returns the number of entries in the ring BEFORE head was moved
121  * @return
122  *   - Actual number of objects dequeued.
123  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
124  */
125 static __rte_always_inline unsigned int
__rte_ring_move_cons_head(struct rte_ring * r,unsigned int is_sc,unsigned int n,enum rte_ring_queue_behavior behavior,uint32_t * old_head,uint32_t * new_head,uint32_t * entries)126 __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
127 		unsigned int n, enum rte_ring_queue_behavior behavior,
128 		uint32_t *old_head, uint32_t *new_head,
129 		uint32_t *entries)
130 {
131 	unsigned int max = n;
132 	int success;
133 
134 	/* move cons.head atomically */
135 	do {
136 		/* Restore n as it may change every loop */
137 		n = max;
138 
139 		*old_head = r->cons.head;
140 
141 		/* add rmb barrier to avoid load/load reorder in weak
142 		 * memory model. It is noop on x86
143 		 */
144 		rte_smp_rmb();
145 
146 		/* The subtraction is done between two unsigned 32bits value
147 		 * (the result is always modulo 32 bits even if we have
148 		 * cons_head > prod_tail). So 'entries' is always between 0
149 		 * and size(ring)-1.
150 		 */
151 		*entries = (r->prod.tail - *old_head);
152 
153 		/* Set the actual entries for dequeue */
154 		if (n > *entries)
155 			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
156 
157 		if (unlikely(n == 0))
158 			return 0;
159 
160 		*new_head = *old_head + n;
161 		if (is_sc) {
162 			r->cons.head = *new_head;
163 			rte_smp_rmb();
164 			success = 1;
165 		} else {
166 			success = rte_atomic32_cmpset(&r->cons.head, *old_head,
167 					*new_head);
168 		}
169 	} while (unlikely(success == 0));
170 	return n;
171 }
172 
173 #endif /* _RTE_RING_GENERIC_H_ */
174