1 /* SPDX-License-Identifier: BSD-3-Clause
2 *
3 * Copyright (c) 2017,2018 HXT-semitech Corporation.
4 * Copyright (c) 2007-2009 Kip Macy [email protected]
5 * All rights reserved.
6 * Derived from FreeBSD's bufring.h
7 * Used as BSD-3 Licensed with permission from Kip Macy.
8 */
9
10 #ifndef _RTE_RING_ELEM_PVT_H_
11 #define _RTE_RING_ELEM_PVT_H_
12
13 static __rte_always_inline void
__rte_ring_enqueue_elems_32(struct rte_ring * r,const uint32_t size,uint32_t idx,const void * obj_table,uint32_t n)14 __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
15 uint32_t idx, const void *obj_table, uint32_t n)
16 {
17 unsigned int i;
18 uint32_t *ring = (uint32_t *)&r[1];
19 const uint32_t *obj = (const uint32_t *)obj_table;
20 if (likely(idx + n <= size)) {
21 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
22 ring[idx] = obj[i];
23 ring[idx + 1] = obj[i + 1];
24 ring[idx + 2] = obj[i + 2];
25 ring[idx + 3] = obj[i + 3];
26 ring[idx + 4] = obj[i + 4];
27 ring[idx + 5] = obj[i + 5];
28 ring[idx + 6] = obj[i + 6];
29 ring[idx + 7] = obj[i + 7];
30 }
31 switch (n & 0x7) {
32 case 7:
33 ring[idx++] = obj[i++]; /* fallthrough */
34 case 6:
35 ring[idx++] = obj[i++]; /* fallthrough */
36 case 5:
37 ring[idx++] = obj[i++]; /* fallthrough */
38 case 4:
39 ring[idx++] = obj[i++]; /* fallthrough */
40 case 3:
41 ring[idx++] = obj[i++]; /* fallthrough */
42 case 2:
43 ring[idx++] = obj[i++]; /* fallthrough */
44 case 1:
45 ring[idx++] = obj[i++]; /* fallthrough */
46 }
47 } else {
48 for (i = 0; idx < size; i++, idx++)
49 ring[idx] = obj[i];
50 /* Start at the beginning */
51 for (idx = 0; i < n; i++, idx++)
52 ring[idx] = obj[i];
53 }
54 }
55
56 static __rte_always_inline void
__rte_ring_enqueue_elems_64(struct rte_ring * r,uint32_t prod_head,const void * obj_table,uint32_t n)57 __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
58 const void *obj_table, uint32_t n)
59 {
60 unsigned int i;
61 const uint32_t size = r->size;
62 uint32_t idx = prod_head & r->mask;
63 uint64_t *ring = (uint64_t *)&r[1];
64 const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
65 if (likely(idx + n <= size)) {
66 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
67 ring[idx] = obj[i];
68 ring[idx + 1] = obj[i + 1];
69 ring[idx + 2] = obj[i + 2];
70 ring[idx + 3] = obj[i + 3];
71 }
72 switch (n & 0x3) {
73 case 3:
74 ring[idx++] = obj[i++]; /* fallthrough */
75 case 2:
76 ring[idx++] = obj[i++]; /* fallthrough */
77 case 1:
78 ring[idx++] = obj[i++];
79 }
80 } else {
81 for (i = 0; idx < size; i++, idx++)
82 ring[idx] = obj[i];
83 /* Start at the beginning */
84 for (idx = 0; i < n; i++, idx++)
85 ring[idx] = obj[i];
86 }
87 }
88
89 static __rte_always_inline void
__rte_ring_enqueue_elems_128(struct rte_ring * r,uint32_t prod_head,const void * obj_table,uint32_t n)90 __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
91 const void *obj_table, uint32_t n)
92 {
93 unsigned int i;
94 const uint32_t size = r->size;
95 uint32_t idx = prod_head & r->mask;
96 rte_int128_t *ring = (rte_int128_t *)&r[1];
97 const rte_int128_t *obj = (const rte_int128_t *)obj_table;
98 if (likely(idx + n <= size)) {
99 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
100 memcpy((void *)(ring + idx),
101 (const void *)(obj + i), 32);
102 switch (n & 0x1) {
103 case 1:
104 memcpy((void *)(ring + idx),
105 (const void *)(obj + i), 16);
106 }
107 } else {
108 for (i = 0; idx < size; i++, idx++)
109 memcpy((void *)(ring + idx),
110 (const void *)(obj + i), 16);
111 /* Start at the beginning */
112 for (idx = 0; i < n; i++, idx++)
113 memcpy((void *)(ring + idx),
114 (const void *)(obj + i), 16);
115 }
116 }
117
118 /* the actual enqueue of elements on the ring.
119 * Placed here since identical code needed in both
120 * single and multi producer enqueue functions.
121 */
122 static __rte_always_inline void
__rte_ring_enqueue_elems(struct rte_ring * r,uint32_t prod_head,const void * obj_table,uint32_t esize,uint32_t num)123 __rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
124 const void *obj_table, uint32_t esize, uint32_t num)
125 {
126 /* 8B and 16B copies implemented individually to retain
127 * the current performance.
128 */
129 if (esize == 8)
130 __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
131 else if (esize == 16)
132 __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
133 else {
134 uint32_t idx, scale, nr_idx, nr_num, nr_size;
135
136 /* Normalize to uint32_t */
137 scale = esize / sizeof(uint32_t);
138 nr_num = num * scale;
139 idx = prod_head & r->mask;
140 nr_idx = idx * scale;
141 nr_size = r->size * scale;
142 __rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
143 obj_table, nr_num);
144 }
145 }
146
147 static __rte_always_inline void
__rte_ring_dequeue_elems_32(struct rte_ring * r,const uint32_t size,uint32_t idx,void * obj_table,uint32_t n)148 __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
149 uint32_t idx, void *obj_table, uint32_t n)
150 {
151 unsigned int i;
152 uint32_t *ring = (uint32_t *)&r[1];
153 uint32_t *obj = (uint32_t *)obj_table;
154 if (likely(idx + n <= size)) {
155 for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
156 obj[i] = ring[idx];
157 obj[i + 1] = ring[idx + 1];
158 obj[i + 2] = ring[idx + 2];
159 obj[i + 3] = ring[idx + 3];
160 obj[i + 4] = ring[idx + 4];
161 obj[i + 5] = ring[idx + 5];
162 obj[i + 6] = ring[idx + 6];
163 obj[i + 7] = ring[idx + 7];
164 }
165 switch (n & 0x7) {
166 case 7:
167 obj[i++] = ring[idx++]; /* fallthrough */
168 case 6:
169 obj[i++] = ring[idx++]; /* fallthrough */
170 case 5:
171 obj[i++] = ring[idx++]; /* fallthrough */
172 case 4:
173 obj[i++] = ring[idx++]; /* fallthrough */
174 case 3:
175 obj[i++] = ring[idx++]; /* fallthrough */
176 case 2:
177 obj[i++] = ring[idx++]; /* fallthrough */
178 case 1:
179 obj[i++] = ring[idx++]; /* fallthrough */
180 }
181 } else {
182 for (i = 0; idx < size; i++, idx++)
183 obj[i] = ring[idx];
184 /* Start at the beginning */
185 for (idx = 0; i < n; i++, idx++)
186 obj[i] = ring[idx];
187 }
188 }
189
190 static __rte_always_inline void
__rte_ring_dequeue_elems_64(struct rte_ring * r,uint32_t prod_head,void * obj_table,uint32_t n)191 __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
192 void *obj_table, uint32_t n)
193 {
194 unsigned int i;
195 const uint32_t size = r->size;
196 uint32_t idx = prod_head & r->mask;
197 uint64_t *ring = (uint64_t *)&r[1];
198 unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
199 if (likely(idx + n <= size)) {
200 for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
201 obj[i] = ring[idx];
202 obj[i + 1] = ring[idx + 1];
203 obj[i + 2] = ring[idx + 2];
204 obj[i + 3] = ring[idx + 3];
205 }
206 switch (n & 0x3) {
207 case 3:
208 obj[i++] = ring[idx++]; /* fallthrough */
209 case 2:
210 obj[i++] = ring[idx++]; /* fallthrough */
211 case 1:
212 obj[i++] = ring[idx++]; /* fallthrough */
213 }
214 } else {
215 for (i = 0; idx < size; i++, idx++)
216 obj[i] = ring[idx];
217 /* Start at the beginning */
218 for (idx = 0; i < n; i++, idx++)
219 obj[i] = ring[idx];
220 }
221 }
222
223 static __rte_always_inline void
__rte_ring_dequeue_elems_128(struct rte_ring * r,uint32_t prod_head,void * obj_table,uint32_t n)224 __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
225 void *obj_table, uint32_t n)
226 {
227 unsigned int i;
228 const uint32_t size = r->size;
229 uint32_t idx = prod_head & r->mask;
230 rte_int128_t *ring = (rte_int128_t *)&r[1];
231 rte_int128_t *obj = (rte_int128_t *)obj_table;
232 if (likely(idx + n <= size)) {
233 for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
234 memcpy((void *)(obj + i), (void *)(ring + idx), 32);
235 switch (n & 0x1) {
236 case 1:
237 memcpy((void *)(obj + i), (void *)(ring + idx), 16);
238 }
239 } else {
240 for (i = 0; idx < size; i++, idx++)
241 memcpy((void *)(obj + i), (void *)(ring + idx), 16);
242 /* Start at the beginning */
243 for (idx = 0; i < n; i++, idx++)
244 memcpy((void *)(obj + i), (void *)(ring + idx), 16);
245 }
246 }
247
248 /* the actual dequeue of elements from the ring.
249 * Placed here since identical code needed in both
250 * single and multi producer enqueue functions.
251 */
252 static __rte_always_inline void
__rte_ring_dequeue_elems(struct rte_ring * r,uint32_t cons_head,void * obj_table,uint32_t esize,uint32_t num)253 __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
254 void *obj_table, uint32_t esize, uint32_t num)
255 {
256 /* 8B and 16B copies implemented individually to retain
257 * the current performance.
258 */
259 if (esize == 8)
260 __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
261 else if (esize == 16)
262 __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
263 else {
264 uint32_t idx, scale, nr_idx, nr_num, nr_size;
265
266 /* Normalize to uint32_t */
267 scale = esize / sizeof(uint32_t);
268 nr_num = num * scale;
269 idx = cons_head & r->mask;
270 nr_idx = idx * scale;
271 nr_size = r->size * scale;
272 __rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
273 obj_table, nr_num);
274 }
275 }
276
277 /* Between load and load. there might be cpu reorder in weak model
278 * (powerpc/arm).
279 * There are 2 choices for the users
280 * 1.use rmb() memory barrier
281 * 2.use one-direction load_acquire/store_release barrier
282 * It depends on performance test results.
283 */
284 #ifdef RTE_USE_C11_MEM_MODEL
285 #include "rte_ring_c11_pvt.h"
286 #else
287 #include "rte_ring_generic_pvt.h"
288 #endif
289
290 /**
291 * @internal Enqueue several objects on the ring
292 *
293 * @param r
294 * A pointer to the ring structure.
295 * @param obj_table
296 * A pointer to a table of objects.
297 * @param esize
298 * The size of ring element, in bytes. It must be a multiple of 4.
299 * This must be the same value used while creating the ring. Otherwise
300 * the results are undefined.
301 * @param n
302 * The number of objects to add in the ring from the obj_table.
303 * @param behavior
304 * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
305 * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
306 * @param is_sp
307 * Indicates whether to use single producer or multi-producer head update
308 * @param free_space
309 * returns the amount of space after the enqueue operation has finished
310 * @return
311 * Actual number of objects enqueued.
312 * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
313 */
314 static __rte_always_inline unsigned int
__rte_ring_do_enqueue_elem(struct rte_ring * r,const void * obj_table,unsigned int esize,unsigned int n,enum rte_ring_queue_behavior behavior,unsigned int is_sp,unsigned int * free_space)315 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
316 unsigned int esize, unsigned int n,
317 enum rte_ring_queue_behavior behavior, unsigned int is_sp,
318 unsigned int *free_space)
319 {
320 uint32_t prod_head, prod_next;
321 uint32_t free_entries;
322
323 n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
324 &prod_head, &prod_next, &free_entries);
325 if (n == 0)
326 goto end;
327
328 __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
329
330 __rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
331 end:
332 if (free_space != NULL)
333 *free_space = free_entries - n;
334 return n;
335 }
336
337 /**
338 * @internal Dequeue several objects from the ring
339 *
340 * @param r
341 * A pointer to the ring structure.
342 * @param obj_table
343 * A pointer to a table of objects.
344 * @param esize
345 * The size of ring element, in bytes. It must be a multiple of 4.
346 * This must be the same value used while creating the ring. Otherwise
347 * the results are undefined.
348 * @param n
349 * The number of objects to pull from the ring.
350 * @param behavior
351 * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
352 * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
353 * @param is_sc
354 * Indicates whether to use single consumer or multi-consumer head update
355 * @param available
356 * returns the number of remaining ring entries after the dequeue has finished
357 * @return
358 * - Actual number of objects dequeued.
359 * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
360 */
361 static __rte_always_inline unsigned int
__rte_ring_do_dequeue_elem(struct rte_ring * r,void * obj_table,unsigned int esize,unsigned int n,enum rte_ring_queue_behavior behavior,unsigned int is_sc,unsigned int * available)362 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
363 unsigned int esize, unsigned int n,
364 enum rte_ring_queue_behavior behavior, unsigned int is_sc,
365 unsigned int *available)
366 {
367 uint32_t cons_head, cons_next;
368 uint32_t entries;
369
370 n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
371 &cons_head, &cons_next, &entries);
372 if (n == 0)
373 goto end;
374
375 __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
376
377 __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
378
379 end:
380 if (available != NULL)
381 *available = entries - n;
382 return n;
383 }
384
385 #endif /* _RTE_RING_ELEM_PVT_H_ */
386