xref: /dpdk/lib/ring/rte_ring_elem_pvt.h (revision 97ed4cb6)
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017,2018 HXT-semitech Corporation.
4  * Copyright (c) 2007-2009 Kip Macy [email protected]
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_ELEM_PVT_H_
11 #define _RTE_RING_ELEM_PVT_H_
12 
13 static __rte_always_inline void
__rte_ring_enqueue_elems_32(struct rte_ring * r,const uint32_t size,uint32_t idx,const void * obj_table,uint32_t n)14 __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
15 		uint32_t idx, const void *obj_table, uint32_t n)
16 {
17 	unsigned int i;
18 	uint32_t *ring = (uint32_t *)&r[1];
19 	const uint32_t *obj = (const uint32_t *)obj_table;
20 	if (likely(idx + n <= size)) {
21 		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
22 			ring[idx] = obj[i];
23 			ring[idx + 1] = obj[i + 1];
24 			ring[idx + 2] = obj[i + 2];
25 			ring[idx + 3] = obj[i + 3];
26 			ring[idx + 4] = obj[i + 4];
27 			ring[idx + 5] = obj[i + 5];
28 			ring[idx + 6] = obj[i + 6];
29 			ring[idx + 7] = obj[i + 7];
30 		}
31 		switch (n & 0x7) {
32 		case 7:
33 			ring[idx++] = obj[i++]; /* fallthrough */
34 		case 6:
35 			ring[idx++] = obj[i++]; /* fallthrough */
36 		case 5:
37 			ring[idx++] = obj[i++]; /* fallthrough */
38 		case 4:
39 			ring[idx++] = obj[i++]; /* fallthrough */
40 		case 3:
41 			ring[idx++] = obj[i++]; /* fallthrough */
42 		case 2:
43 			ring[idx++] = obj[i++]; /* fallthrough */
44 		case 1:
45 			ring[idx++] = obj[i++]; /* fallthrough */
46 		}
47 	} else {
48 		for (i = 0; idx < size; i++, idx++)
49 			ring[idx] = obj[i];
50 		/* Start at the beginning */
51 		for (idx = 0; i < n; i++, idx++)
52 			ring[idx] = obj[i];
53 	}
54 }
55 
56 static __rte_always_inline void
__rte_ring_enqueue_elems_64(struct rte_ring * r,uint32_t prod_head,const void * obj_table,uint32_t n)57 __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
58 		const void *obj_table, uint32_t n)
59 {
60 	unsigned int i;
61 	const uint32_t size = r->size;
62 	uint32_t idx = prod_head & r->mask;
63 	uint64_t *ring = (uint64_t *)&r[1];
64 	const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
65 	if (likely(idx + n <= size)) {
66 		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
67 			ring[idx] = obj[i];
68 			ring[idx + 1] = obj[i + 1];
69 			ring[idx + 2] = obj[i + 2];
70 			ring[idx + 3] = obj[i + 3];
71 		}
72 		switch (n & 0x3) {
73 		case 3:
74 			ring[idx++] = obj[i++]; /* fallthrough */
75 		case 2:
76 			ring[idx++] = obj[i++]; /* fallthrough */
77 		case 1:
78 			ring[idx++] = obj[i++];
79 		}
80 	} else {
81 		for (i = 0; idx < size; i++, idx++)
82 			ring[idx] = obj[i];
83 		/* Start at the beginning */
84 		for (idx = 0; i < n; i++, idx++)
85 			ring[idx] = obj[i];
86 	}
87 }
88 
89 static __rte_always_inline void
__rte_ring_enqueue_elems_128(struct rte_ring * r,uint32_t prod_head,const void * obj_table,uint32_t n)90 __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
91 		const void *obj_table, uint32_t n)
92 {
93 	unsigned int i;
94 	const uint32_t size = r->size;
95 	uint32_t idx = prod_head & r->mask;
96 	rte_int128_t *ring = (rte_int128_t *)&r[1];
97 	const rte_int128_t *obj = (const rte_int128_t *)obj_table;
98 	if (likely(idx + n <= size)) {
99 		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
100 			memcpy((void *)(ring + idx),
101 				(const void *)(obj + i), 32);
102 		switch (n & 0x1) {
103 		case 1:
104 			memcpy((void *)(ring + idx),
105 				(const void *)(obj + i), 16);
106 		}
107 	} else {
108 		for (i = 0; idx < size; i++, idx++)
109 			memcpy((void *)(ring + idx),
110 				(const void *)(obj + i), 16);
111 		/* Start at the beginning */
112 		for (idx = 0; i < n; i++, idx++)
113 			memcpy((void *)(ring + idx),
114 				(const void *)(obj + i), 16);
115 	}
116 }
117 
118 /* the actual enqueue of elements on the ring.
119  * Placed here since identical code needed in both
120  * single and multi producer enqueue functions.
121  */
122 static __rte_always_inline void
__rte_ring_enqueue_elems(struct rte_ring * r,uint32_t prod_head,const void * obj_table,uint32_t esize,uint32_t num)123 __rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
124 		const void *obj_table, uint32_t esize, uint32_t num)
125 {
126 	/* 8B and 16B copies implemented individually to retain
127 	 * the current performance.
128 	 */
129 	if (esize == 8)
130 		__rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
131 	else if (esize == 16)
132 		__rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
133 	else {
134 		uint32_t idx, scale, nr_idx, nr_num, nr_size;
135 
136 		/* Normalize to uint32_t */
137 		scale = esize / sizeof(uint32_t);
138 		nr_num = num * scale;
139 		idx = prod_head & r->mask;
140 		nr_idx = idx * scale;
141 		nr_size = r->size * scale;
142 		__rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
143 				obj_table, nr_num);
144 	}
145 }
146 
147 static __rte_always_inline void
__rte_ring_dequeue_elems_32(struct rte_ring * r,const uint32_t size,uint32_t idx,void * obj_table,uint32_t n)148 __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
149 		uint32_t idx, void *obj_table, uint32_t n)
150 {
151 	unsigned int i;
152 	uint32_t *ring = (uint32_t *)&r[1];
153 	uint32_t *obj = (uint32_t *)obj_table;
154 	if (likely(idx + n <= size)) {
155 		for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
156 			obj[i] = ring[idx];
157 			obj[i + 1] = ring[idx + 1];
158 			obj[i + 2] = ring[idx + 2];
159 			obj[i + 3] = ring[idx + 3];
160 			obj[i + 4] = ring[idx + 4];
161 			obj[i + 5] = ring[idx + 5];
162 			obj[i + 6] = ring[idx + 6];
163 			obj[i + 7] = ring[idx + 7];
164 		}
165 		switch (n & 0x7) {
166 		case 7:
167 			obj[i++] = ring[idx++]; /* fallthrough */
168 		case 6:
169 			obj[i++] = ring[idx++]; /* fallthrough */
170 		case 5:
171 			obj[i++] = ring[idx++]; /* fallthrough */
172 		case 4:
173 			obj[i++] = ring[idx++]; /* fallthrough */
174 		case 3:
175 			obj[i++] = ring[idx++]; /* fallthrough */
176 		case 2:
177 			obj[i++] = ring[idx++]; /* fallthrough */
178 		case 1:
179 			obj[i++] = ring[idx++]; /* fallthrough */
180 		}
181 	} else {
182 		for (i = 0; idx < size; i++, idx++)
183 			obj[i] = ring[idx];
184 		/* Start at the beginning */
185 		for (idx = 0; i < n; i++, idx++)
186 			obj[i] = ring[idx];
187 	}
188 }
189 
190 static __rte_always_inline void
__rte_ring_dequeue_elems_64(struct rte_ring * r,uint32_t prod_head,void * obj_table,uint32_t n)191 __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
192 		void *obj_table, uint32_t n)
193 {
194 	unsigned int i;
195 	const uint32_t size = r->size;
196 	uint32_t idx = prod_head & r->mask;
197 	uint64_t *ring = (uint64_t *)&r[1];
198 	unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
199 	if (likely(idx + n <= size)) {
200 		for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
201 			obj[i] = ring[idx];
202 			obj[i + 1] = ring[idx + 1];
203 			obj[i + 2] = ring[idx + 2];
204 			obj[i + 3] = ring[idx + 3];
205 		}
206 		switch (n & 0x3) {
207 		case 3:
208 			obj[i++] = ring[idx++]; /* fallthrough */
209 		case 2:
210 			obj[i++] = ring[idx++]; /* fallthrough */
211 		case 1:
212 			obj[i++] = ring[idx++]; /* fallthrough */
213 		}
214 	} else {
215 		for (i = 0; idx < size; i++, idx++)
216 			obj[i] = ring[idx];
217 		/* Start at the beginning */
218 		for (idx = 0; i < n; i++, idx++)
219 			obj[i] = ring[idx];
220 	}
221 }
222 
223 static __rte_always_inline void
__rte_ring_dequeue_elems_128(struct rte_ring * r,uint32_t prod_head,void * obj_table,uint32_t n)224 __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
225 		void *obj_table, uint32_t n)
226 {
227 	unsigned int i;
228 	const uint32_t size = r->size;
229 	uint32_t idx = prod_head & r->mask;
230 	rte_int128_t *ring = (rte_int128_t *)&r[1];
231 	rte_int128_t *obj = (rte_int128_t *)obj_table;
232 	if (likely(idx + n <= size)) {
233 		for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
234 			memcpy((void *)(obj + i), (void *)(ring + idx), 32);
235 		switch (n & 0x1) {
236 		case 1:
237 			memcpy((void *)(obj + i), (void *)(ring + idx), 16);
238 		}
239 	} else {
240 		for (i = 0; idx < size; i++, idx++)
241 			memcpy((void *)(obj + i), (void *)(ring + idx), 16);
242 		/* Start at the beginning */
243 		for (idx = 0; i < n; i++, idx++)
244 			memcpy((void *)(obj + i), (void *)(ring + idx), 16);
245 	}
246 }
247 
248 /* the actual dequeue of elements from the ring.
249  * Placed here since identical code needed in both
250  * single and multi producer enqueue functions.
251  */
252 static __rte_always_inline void
__rte_ring_dequeue_elems(struct rte_ring * r,uint32_t cons_head,void * obj_table,uint32_t esize,uint32_t num)253 __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
254 		void *obj_table, uint32_t esize, uint32_t num)
255 {
256 	/* 8B and 16B copies implemented individually to retain
257 	 * the current performance.
258 	 */
259 	if (esize == 8)
260 		__rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
261 	else if (esize == 16)
262 		__rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
263 	else {
264 		uint32_t idx, scale, nr_idx, nr_num, nr_size;
265 
266 		/* Normalize to uint32_t */
267 		scale = esize / sizeof(uint32_t);
268 		nr_num = num * scale;
269 		idx = cons_head & r->mask;
270 		nr_idx = idx * scale;
271 		nr_size = r->size * scale;
272 		__rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
273 				obj_table, nr_num);
274 	}
275 }
276 
277 /* Between load and load. there might be cpu reorder in weak model
278  * (powerpc/arm).
279  * There are 2 choices for the users
280  * 1.use rmb() memory barrier
281  * 2.use one-direction load_acquire/store_release barrier
282  * It depends on performance test results.
283  */
284 #ifdef RTE_USE_C11_MEM_MODEL
285 #include "rte_ring_c11_pvt.h"
286 #else
287 #include "rte_ring_generic_pvt.h"
288 #endif
289 
290 /**
291  * @internal Enqueue several objects on the ring
292  *
293  * @param r
294  *   A pointer to the ring structure.
295  * @param obj_table
296  *   A pointer to a table of objects.
297  * @param esize
298  *   The size of ring element, in bytes. It must be a multiple of 4.
299  *   This must be the same value used while creating the ring. Otherwise
300  *   the results are undefined.
301  * @param n
302  *   The number of objects to add in the ring from the obj_table.
303  * @param behavior
304  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
305  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
306  * @param is_sp
307  *   Indicates whether to use single producer or multi-producer head update
308  * @param free_space
309  *   returns the amount of space after the enqueue operation has finished
310  * @return
311  *   Actual number of objects enqueued.
312  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
313  */
314 static __rte_always_inline unsigned int
__rte_ring_do_enqueue_elem(struct rte_ring * r,const void * obj_table,unsigned int esize,unsigned int n,enum rte_ring_queue_behavior behavior,unsigned int is_sp,unsigned int * free_space)315 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
316 		unsigned int esize, unsigned int n,
317 		enum rte_ring_queue_behavior behavior, unsigned int is_sp,
318 		unsigned int *free_space)
319 {
320 	uint32_t prod_head, prod_next;
321 	uint32_t free_entries;
322 
323 	n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
324 			&prod_head, &prod_next, &free_entries);
325 	if (n == 0)
326 		goto end;
327 
328 	__rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
329 
330 	__rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
331 end:
332 	if (free_space != NULL)
333 		*free_space = free_entries - n;
334 	return n;
335 }
336 
337 /**
338  * @internal Dequeue several objects from the ring
339  *
340  * @param r
341  *   A pointer to the ring structure.
342  * @param obj_table
343  *   A pointer to a table of objects.
344  * @param esize
345  *   The size of ring element, in bytes. It must be a multiple of 4.
346  *   This must be the same value used while creating the ring. Otherwise
347  *   the results are undefined.
348  * @param n
349  *   The number of objects to pull from the ring.
350  * @param behavior
351  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
352  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
353  * @param is_sc
354  *   Indicates whether to use single consumer or multi-consumer head update
355  * @param available
356  *   returns the number of remaining ring entries after the dequeue has finished
357  * @return
358  *   - Actual number of objects dequeued.
359  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
360  */
361 static __rte_always_inline unsigned int
__rte_ring_do_dequeue_elem(struct rte_ring * r,void * obj_table,unsigned int esize,unsigned int n,enum rte_ring_queue_behavior behavior,unsigned int is_sc,unsigned int * available)362 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
363 		unsigned int esize, unsigned int n,
364 		enum rte_ring_queue_behavior behavior, unsigned int is_sc,
365 		unsigned int *available)
366 {
367 	uint32_t cons_head, cons_next;
368 	uint32_t entries;
369 
370 	n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
371 			&cons_head, &cons_next, &entries);
372 	if (n == 0)
373 		goto end;
374 
375 	__rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
376 
377 	__rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
378 
379 end:
380 	if (available != NULL)
381 		*available = entries - n;
382 	return n;
383 }
384 
385 #endif /* _RTE_RING_ELEM_PVT_H_ */
386