1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2020 Arm Limited
4  * Copyright (c) 2007-2009 Kip Macy [email protected]
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_PEEK_ZC_H_
11 #define _RTE_RING_PEEK_ZC_H_
12 
13 /**
14  * @file
15  * @b EXPERIMENTAL: this API may change without prior notice
16  * It is not recommended to include this file directly.
17  * Please include <rte_ring_elem.h> instead.
18  *
19  * Ring Peek Zero Copy APIs
20  * These APIs make it possible to split public enqueue/dequeue API
21  * into 3 parts:
22  * - enqueue/dequeue start
23  * - copy data to/from the ring
24  * - enqueue/dequeue finish
25  * Along with the advantages of the peek APIs, these APIs provide the ability
26  * to avoid copying of the data to temporary area (for ex: array of mbufs
27  * on the stack).
28  *
29  * Note that currently these APIs are available only for two sync modes:
30  * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
31  * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
32  * It is user's responsibility to create/init ring with appropriate sync
33  * modes selected.
34  *
35  * Following are some examples showing the API usage.
36  * 1)
37  * struct elem_obj {uint64_t a; uint32_t b, c;};
38  * struct elem_obj *obj;
39  *
40  * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
41  * // Reserve space on the ring
42  * n = rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(elem_obj), 1, &zcd, NULL);
43  *
44  * // Produce the data directly on the ring memory
45  * obj = (struct elem_obj *)zcd->ptr1;
46  * obj->a = rte_get_a();
47  * obj->b = rte_get_b();
48  * obj->c = rte_get_c();
49  * rte_ring_enqueue_zc_elem_finish(ring, n);
50  *
51  * 2)
52  * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
53  * // Reserve space on the ring
54  * n = rte_ring_enqueue_zc_burst_start(r, 32, &zcd, NULL);
55  *
56  * // Pkt I/O core polls packets from the NIC
57  * if (n != 0) {
58  *	nb_rx = rte_eth_rx_burst(portid, queueid, zcd->ptr1, zcd->n1);
59  *	if (nb_rx == zcd->n1 && n != zcd->n1)
60  *		nb_rx = rte_eth_rx_burst(portid, queueid,
61  *						zcd->ptr2, n - zcd->n1);
62  *
63  *	// Provide packets to the packet processing cores
64  *	rte_ring_enqueue_zc_finish(r, nb_rx);
65  * }
66  *
67  * Note that between _start_ and _finish_ none other thread can proceed
68  * with enqueue/dequeue operation till _finish_ completes.
69  */
70 
71 #ifdef __cplusplus
72 extern "C" {
73 #endif
74 
75 #include <rte_ring_peek_c11_mem.h>
76 
77 /**
78  * Ring zero-copy information structure.
79  *
80  * This structure contains the pointers and length of the space
81  * reserved on the ring storage.
82  */
83 struct rte_ring_zc_data {
84 	/* Pointer to the first space in the ring */
85 	void *ptr1;
86 	/* Pointer to the second space in the ring if there is wrap-around.
87 	 * It contains valid value only if wrap-around happens.
88 	 */
89 	void *ptr2;
90 	/* Number of elements in the first pointer. If this is equal to
91 	 * the number of elements requested, then ptr2 is NULL.
92 	 * Otherwise, subtracting n1 from number of elements requested
93 	 * will give the number of elements available at ptr2.
94 	 */
95 	unsigned int n1;
96 } __rte_cache_aligned;
97 
98 static __rte_always_inline void
__rte_ring_get_elem_addr(struct rte_ring * r,uint32_t head,uint32_t esize,uint32_t num,void ** dst1,uint32_t * n1,void ** dst2)99 __rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
100 	uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
101 {
102 	uint32_t idx, scale, nr_idx;
103 	uint32_t *ring = (uint32_t *)&r[1];
104 
105 	/* Normalize to uint32_t */
106 	scale = esize / sizeof(uint32_t);
107 	idx = head & r->mask;
108 	nr_idx = idx * scale;
109 
110 	*dst1 = ring + nr_idx;
111 	*n1 = num;
112 
113 	if (idx + num > r->size) {
114 		*n1 = r->size - idx;
115 		*dst2 = ring;
116 	} else {
117 		*dst2 = NULL;
118 	}
119 }
120 
121 /**
122  * @internal This function moves prod head value.
123  */
124 static __rte_always_inline unsigned int
__rte_ring_do_enqueue_zc_elem_start(struct rte_ring * r,unsigned int esize,uint32_t n,enum rte_ring_queue_behavior behavior,struct rte_ring_zc_data * zcd,unsigned int * free_space)125 __rte_ring_do_enqueue_zc_elem_start(struct rte_ring *r, unsigned int esize,
126 		uint32_t n, enum rte_ring_queue_behavior behavior,
127 		struct rte_ring_zc_data *zcd, unsigned int *free_space)
128 {
129 	uint32_t free, head, next;
130 
131 	switch (r->prod.sync_type) {
132 	case RTE_RING_SYNC_ST:
133 		n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
134 			behavior, &head, &next, &free);
135 		break;
136 	case RTE_RING_SYNC_MT_HTS:
137 		n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
138 		break;
139 	case RTE_RING_SYNC_MT:
140 	case RTE_RING_SYNC_MT_RTS:
141 	default:
142 		/* unsupported mode, shouldn't be here */
143 		RTE_ASSERT(0);
144 		n = 0;
145 		free = 0;
146 		return n;
147 	}
148 
149 	__rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
150 		&zcd->n1, &zcd->ptr2);
151 
152 	if (free_space != NULL)
153 		*free_space = free - n;
154 	return n;
155 }
156 
157 /**
158  * Start to enqueue several objects on the ring.
159  * Note that no actual objects are put in the queue by this function,
160  * it just reserves space for the user on the ring.
161  * User has to copy objects into the queue using the returned pointers.
162  * User should call rte_ring_enqueue_zc_elem_finish to complete the
163  * enqueue operation.
164  *
165  * @param r
166  *   A pointer to the ring structure.
167  * @param esize
168  *   The size of ring element, in bytes. It must be a multiple of 4.
169  * @param n
170  *   The number of objects to add in the ring.
171  * @param zcd
172  *   Structure containing the pointers and length of the space
173  *   reserved on the ring storage.
174  * @param free_space
175  *   If non-NULL, returns the amount of space in the ring after the
176  *   reservation operation has finished.
177  * @return
178  *   The number of objects that can be enqueued, either 0 or n
179  */
180 __rte_experimental
181 static __rte_always_inline unsigned int
rte_ring_enqueue_zc_bulk_elem_start(struct rte_ring * r,unsigned int esize,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * free_space)182 rte_ring_enqueue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
183 	unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
184 {
185 	return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
186 			RTE_RING_QUEUE_FIXED, zcd, free_space);
187 }
188 
189 /**
190  * Start to enqueue several pointers to objects on the ring.
191  * Note that no actual pointers are put in the queue by this function,
192  * it just reserves space for the user on the ring.
193  * User has to copy pointers to objects into the queue using the
194  * returned pointers.
195  * User should call rte_ring_enqueue_zc_finish to complete the
196  * enqueue operation.
197  *
198  * @param r
199  *   A pointer to the ring structure.
200  * @param n
201  *   The number of objects to add in the ring.
202  * @param zcd
203  *   Structure containing the pointers and length of the space
204  *   reserved on the ring storage.
205  * @param free_space
206  *   If non-NULL, returns the amount of space in the ring after the
207  *   reservation operation has finished.
208  * @return
209  *   The number of objects that can be enqueued, either 0 or n
210  */
211 __rte_experimental
212 static __rte_always_inline unsigned int
rte_ring_enqueue_zc_bulk_start(struct rte_ring * r,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * free_space)213 rte_ring_enqueue_zc_bulk_start(struct rte_ring *r, unsigned int n,
214 	struct rte_ring_zc_data *zcd, unsigned int *free_space)
215 {
216 	return rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(uintptr_t), n,
217 							zcd, free_space);
218 }
219 
220 /**
221  * Start to enqueue several objects on the ring.
222  * Note that no actual objects are put in the queue by this function,
223  * it just reserves space for the user on the ring.
224  * User has to copy objects into the queue using the returned pointers.
225  * User should call rte_ring_enqueue_zc_elem_finish to complete the
226  * enqueue operation.
227  *
228  * @param r
229  *   A pointer to the ring structure.
230  * @param esize
231  *   The size of ring element, in bytes. It must be a multiple of 4.
232  * @param n
233  *   The number of objects to add in the ring.
234  * @param zcd
235  *   Structure containing the pointers and length of the space
236  *   reserved on the ring storage.
237  * @param free_space
238  *   If non-NULL, returns the amount of space in the ring after the
239  *   reservation operation has finished.
240  * @return
241  *   The number of objects that can be enqueued, either 0 or n
242  */
243 __rte_experimental
244 static __rte_always_inline unsigned int
rte_ring_enqueue_zc_burst_elem_start(struct rte_ring * r,unsigned int esize,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * free_space)245 rte_ring_enqueue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
246 	unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
247 {
248 	return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
249 			RTE_RING_QUEUE_VARIABLE, zcd, free_space);
250 }
251 
252 /**
253  * Start to enqueue several pointers to objects on the ring.
254  * Note that no actual pointers are put in the queue by this function,
255  * it just reserves space for the user on the ring.
256  * User has to copy pointers to objects into the queue using the
257  * returned pointers.
258  * User should call rte_ring_enqueue_zc_finish to complete the
259  * enqueue operation.
260  *
261  * @param r
262  *   A pointer to the ring structure.
263  * @param n
264  *   The number of objects to add in the ring.
265  * @param zcd
266  *   Structure containing the pointers and length of the space
267  *   reserved on the ring storage.
268  * @param free_space
269  *   If non-NULL, returns the amount of space in the ring after the
270  *   reservation operation has finished.
271  * @return
272  *   The number of objects that can be enqueued, either 0 or n.
273  */
274 __rte_experimental
275 static __rte_always_inline unsigned int
rte_ring_enqueue_zc_burst_start(struct rte_ring * r,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * free_space)276 rte_ring_enqueue_zc_burst_start(struct rte_ring *r, unsigned int n,
277 	struct rte_ring_zc_data *zcd, unsigned int *free_space)
278 {
279 	return rte_ring_enqueue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
280 							zcd, free_space);
281 }
282 
283 /**
284  * Complete enqueuing several objects on the ring.
285  * Note that number of objects to enqueue should not exceed previous
286  * enqueue_start return value.
287  *
288  * @param r
289  *   A pointer to the ring structure.
290  * @param n
291  *   The number of objects to add to the ring.
292  */
293 __rte_experimental
294 static __rte_always_inline void
rte_ring_enqueue_zc_elem_finish(struct rte_ring * r,unsigned int n)295 rte_ring_enqueue_zc_elem_finish(struct rte_ring *r, unsigned int n)
296 {
297 	uint32_t tail;
298 
299 	switch (r->prod.sync_type) {
300 	case RTE_RING_SYNC_ST:
301 		n = __rte_ring_st_get_tail(&r->prod, &tail, n);
302 		__rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
303 		break;
304 	case RTE_RING_SYNC_MT_HTS:
305 		n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
306 		__rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
307 		break;
308 	case RTE_RING_SYNC_MT:
309 	case RTE_RING_SYNC_MT_RTS:
310 	default:
311 		/* unsupported mode, shouldn't be here */
312 		RTE_ASSERT(0);
313 	}
314 }
315 
316 /**
317  * Complete enqueuing several pointers to objects on the ring.
318  * Note that number of objects to enqueue should not exceed previous
319  * enqueue_start return value.
320  *
321  * @param r
322  *   A pointer to the ring structure.
323  * @param n
324  *   The number of pointers to objects to add to the ring.
325  */
326 __rte_experimental
327 static __rte_always_inline void
rte_ring_enqueue_zc_finish(struct rte_ring * r,unsigned int n)328 rte_ring_enqueue_zc_finish(struct rte_ring *r, unsigned int n)
329 {
330 	rte_ring_enqueue_zc_elem_finish(r, n);
331 }
332 
333 /**
334  * @internal This function moves cons head value and copies up to *n*
335  * objects from the ring to the user provided obj_table.
336  */
337 static __rte_always_inline unsigned int
__rte_ring_do_dequeue_zc_elem_start(struct rte_ring * r,uint32_t esize,uint32_t n,enum rte_ring_queue_behavior behavior,struct rte_ring_zc_data * zcd,unsigned int * available)338 __rte_ring_do_dequeue_zc_elem_start(struct rte_ring *r,
339 	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
340 	struct rte_ring_zc_data *zcd, unsigned int *available)
341 {
342 	uint32_t avail, head, next;
343 
344 	switch (r->cons.sync_type) {
345 	case RTE_RING_SYNC_ST:
346 		n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
347 			behavior, &head, &next, &avail);
348 		break;
349 	case RTE_RING_SYNC_MT_HTS:
350 		n = __rte_ring_hts_move_cons_head(r, n, behavior,
351 			&head, &avail);
352 		break;
353 	case RTE_RING_SYNC_MT:
354 	case RTE_RING_SYNC_MT_RTS:
355 	default:
356 		/* unsupported mode, shouldn't be here */
357 		RTE_ASSERT(0);
358 		n = 0;
359 		avail = 0;
360 		return n;
361 	}
362 
363 	__rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
364 		&zcd->n1, &zcd->ptr2);
365 
366 	if (available != NULL)
367 		*available = avail - n;
368 	return n;
369 }
370 
371 /**
372  * Start to dequeue several objects from the ring.
373  * Note that no actual objects are copied from the queue by this function.
374  * User has to copy objects from the queue using the returned pointers.
375  * User should call rte_ring_dequeue_zc_elem_finish to complete the
376  * dequeue operation.
377  *
378  * @param r
379  *   A pointer to the ring structure.
380  * @param esize
381  *   The size of ring element, in bytes. It must be a multiple of 4.
382  * @param n
383  *   The number of objects to remove from the ring.
384  * @param zcd
385  *   Structure containing the pointers and length of the space
386  *   reserved on the ring storage.
387  * @param available
388  *   If non-NULL, returns the number of remaining ring entries after the
389  *   dequeue has finished.
390  * @return
391  *   The number of objects that can be dequeued, either 0 or n.
392  */
393 __rte_experimental
394 static __rte_always_inline unsigned int
rte_ring_dequeue_zc_bulk_elem_start(struct rte_ring * r,unsigned int esize,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * available)395 rte_ring_dequeue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
396 	unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
397 {
398 	return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
399 			RTE_RING_QUEUE_FIXED, zcd, available);
400 }
401 
402 /**
403  * Start to dequeue several pointers to objects from the ring.
404  * Note that no actual pointers are removed from the queue by this function.
405  * User has to copy pointers to objects from the queue using the
406  * returned pointers.
407  * User should call rte_ring_dequeue_zc_finish to complete the
408  * dequeue operation.
409  *
410  * @param r
411  *   A pointer to the ring structure.
412  * @param n
413  *   The number of objects to remove from the ring.
414  * @param zcd
415  *   Structure containing the pointers and length of the space
416  *   reserved on the ring storage.
417  * @param available
418  *   If non-NULL, returns the number of remaining ring entries after the
419  *   dequeue has finished.
420  * @return
421  *   The number of objects that can be dequeued, either 0 or n.
422  */
423 __rte_experimental
424 static __rte_always_inline unsigned int
rte_ring_dequeue_zc_bulk_start(struct rte_ring * r,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * available)425 rte_ring_dequeue_zc_bulk_start(struct rte_ring *r, unsigned int n,
426 	struct rte_ring_zc_data *zcd, unsigned int *available)
427 {
428 	return rte_ring_dequeue_zc_bulk_elem_start(r, sizeof(uintptr_t),
429 		n, zcd, available);
430 }
431 
432 /**
433  * Start to dequeue several objects from the ring.
434  * Note that no actual objects are copied from the queue by this function.
435  * User has to copy objects from the queue using the returned pointers.
436  * User should call rte_ring_dequeue_zc_elem_finish to complete the
437  * dequeue operation.
438  *
439  * @param r
440  *   A pointer to the ring structure.
441  * @param esize
442  *   The size of ring element, in bytes. It must be a multiple of 4.
443  *   This must be the same value used while creating the ring. Otherwise
444  *   the results are undefined.
445  * @param n
446  *   The number of objects to dequeue from the ring.
447  * @param zcd
448  *   Structure containing the pointers and length of the space
449  *   reserved on the ring storage.
450  * @param available
451  *   If non-NULL, returns the number of remaining ring entries after the
452  *   dequeue has finished.
453  * @return
454  *   The number of objects that can be dequeued, either 0 or n.
455  */
456 __rte_experimental
457 static __rte_always_inline unsigned int
rte_ring_dequeue_zc_burst_elem_start(struct rte_ring * r,unsigned int esize,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * available)458 rte_ring_dequeue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
459 	unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
460 {
461 	return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
462 			RTE_RING_QUEUE_VARIABLE, zcd, available);
463 }
464 
465 /**
466  * Start to dequeue several pointers to objects from the ring.
467  * Note that no actual pointers are removed from the queue by this function.
468  * User has to copy pointers to objects from the queue using the
469  * returned pointers.
470  * User should call rte_ring_dequeue_zc_finish to complete the
471  * dequeue operation.
472  *
473  * @param r
474  *   A pointer to the ring structure.
475  * @param n
476  *   The number of objects to remove from the ring.
477  * @param zcd
478  *   Structure containing the pointers and length of the space
479  *   reserved on the ring storage.
480  * @param available
481  *   If non-NULL, returns the number of remaining ring entries after the
482  *   dequeue has finished.
483  * @return
484  *   The number of objects that can be dequeued, either 0 or n.
485  */
486 __rte_experimental
487 static __rte_always_inline unsigned int
rte_ring_dequeue_zc_burst_start(struct rte_ring * r,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * available)488 rte_ring_dequeue_zc_burst_start(struct rte_ring *r, unsigned int n,
489 		struct rte_ring_zc_data *zcd, unsigned int *available)
490 {
491 	return rte_ring_dequeue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
492 			zcd, available);
493 }
494 
495 /**
496  * Complete dequeuing several objects from the ring.
497  * Note that number of objects to dequeued should not exceed previous
498  * dequeue_start return value.
499  *
500  * @param r
501  *   A pointer to the ring structure.
502  * @param n
503  *   The number of objects to remove from the ring.
504  */
505 __rte_experimental
506 static __rte_always_inline void
rte_ring_dequeue_zc_elem_finish(struct rte_ring * r,unsigned int n)507 rte_ring_dequeue_zc_elem_finish(struct rte_ring *r, unsigned int n)
508 {
509 	uint32_t tail;
510 
511 	switch (r->cons.sync_type) {
512 	case RTE_RING_SYNC_ST:
513 		n = __rte_ring_st_get_tail(&r->cons, &tail, n);
514 		__rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
515 		break;
516 	case RTE_RING_SYNC_MT_HTS:
517 		n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
518 		__rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
519 		break;
520 	case RTE_RING_SYNC_MT:
521 	case RTE_RING_SYNC_MT_RTS:
522 	default:
523 		/* unsupported mode, shouldn't be here */
524 		RTE_ASSERT(0);
525 	}
526 }
527 
528 /**
529  * Complete dequeuing several objects from the ring.
530  * Note that number of objects to dequeued should not exceed previous
531  * dequeue_start return value.
532  *
533  * @param r
534  *   A pointer to the ring structure.
535  * @param n
536  *   The number of objects to remove from the ring.
537  */
538 __rte_experimental
539 static __rte_always_inline void
rte_ring_dequeue_zc_finish(struct rte_ring * r,unsigned int n)540 rte_ring_dequeue_zc_finish(struct rte_ring *r, unsigned int n)
541 {
542 	rte_ring_dequeue_elem_finish(r, n);
543 }
544 
545 #ifdef __cplusplus
546 }
547 #endif
548 
549 #endif /* _RTE_RING_PEEK_ZC_H_ */
550