1 /* SPDX-License-Identifier: BSD-3-Clause
2 *
3 * Copyright (c) 2020 Arm Limited
4 * Copyright (c) 2007-2009 Kip Macy [email protected]
5 * All rights reserved.
6 * Derived from FreeBSD's bufring.h
7 * Used as BSD-3 Licensed with permission from Kip Macy.
8 */
9
10 #ifndef _RTE_RING_PEEK_ZC_H_
11 #define _RTE_RING_PEEK_ZC_H_
12
13 /**
14 * @file
15 * @b EXPERIMENTAL: this API may change without prior notice
16 * It is not recommended to include this file directly.
17 * Please include <rte_ring_elem.h> instead.
18 *
19 * Ring Peek Zero Copy APIs
20 * These APIs make it possible to split public enqueue/dequeue API
21 * into 3 parts:
22 * - enqueue/dequeue start
23 * - copy data to/from the ring
24 * - enqueue/dequeue finish
25 * Along with the advantages of the peek APIs, these APIs provide the ability
26 * to avoid copying of the data to temporary area (for ex: array of mbufs
27 * on the stack).
28 *
29 * Note that currently these APIs are available only for two sync modes:
30 * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
31 * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
32 * It is user's responsibility to create/init ring with appropriate sync
33 * modes selected.
34 *
35 * Following are some examples showing the API usage.
36 * 1)
37 * struct elem_obj {uint64_t a; uint32_t b, c;};
38 * struct elem_obj *obj;
39 *
40 * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
41 * // Reserve space on the ring
42 * n = rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(elem_obj), 1, &zcd, NULL);
43 *
44 * // Produce the data directly on the ring memory
45 * obj = (struct elem_obj *)zcd->ptr1;
46 * obj->a = rte_get_a();
47 * obj->b = rte_get_b();
48 * obj->c = rte_get_c();
49 * rte_ring_enqueue_zc_elem_finish(ring, n);
50 *
51 * 2)
52 * // Create ring with sync type RTE_RING_SYNC_ST or RTE_RING_SYNC_MT_HTS
53 * // Reserve space on the ring
54 * n = rte_ring_enqueue_zc_burst_start(r, 32, &zcd, NULL);
55 *
56 * // Pkt I/O core polls packets from the NIC
57 * if (n != 0) {
58 * nb_rx = rte_eth_rx_burst(portid, queueid, zcd->ptr1, zcd->n1);
59 * if (nb_rx == zcd->n1 && n != zcd->n1)
60 * nb_rx = rte_eth_rx_burst(portid, queueid,
61 * zcd->ptr2, n - zcd->n1);
62 *
63 * // Provide packets to the packet processing cores
64 * rte_ring_enqueue_zc_finish(r, nb_rx);
65 * }
66 *
67 * Note that between _start_ and _finish_ none other thread can proceed
68 * with enqueue/dequeue operation till _finish_ completes.
69 */
70
71 #ifdef __cplusplus
72 extern "C" {
73 #endif
74
75 #include <rte_ring_peek_c11_mem.h>
76
77 /**
78 * Ring zero-copy information structure.
79 *
80 * This structure contains the pointers and length of the space
81 * reserved on the ring storage.
82 */
83 struct rte_ring_zc_data {
84 /* Pointer to the first space in the ring */
85 void *ptr1;
86 /* Pointer to the second space in the ring if there is wrap-around.
87 * It contains valid value only if wrap-around happens.
88 */
89 void *ptr2;
90 /* Number of elements in the first pointer. If this is equal to
91 * the number of elements requested, then ptr2 is NULL.
92 * Otherwise, subtracting n1 from number of elements requested
93 * will give the number of elements available at ptr2.
94 */
95 unsigned int n1;
96 } __rte_cache_aligned;
97
98 static __rte_always_inline void
__rte_ring_get_elem_addr(struct rte_ring * r,uint32_t head,uint32_t esize,uint32_t num,void ** dst1,uint32_t * n1,void ** dst2)99 __rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head,
100 uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2)
101 {
102 uint32_t idx, scale, nr_idx;
103 uint32_t *ring = (uint32_t *)&r[1];
104
105 /* Normalize to uint32_t */
106 scale = esize / sizeof(uint32_t);
107 idx = head & r->mask;
108 nr_idx = idx * scale;
109
110 *dst1 = ring + nr_idx;
111 *n1 = num;
112
113 if (idx + num > r->size) {
114 *n1 = r->size - idx;
115 *dst2 = ring;
116 } else {
117 *dst2 = NULL;
118 }
119 }
120
121 /**
122 * @internal This function moves prod head value.
123 */
124 static __rte_always_inline unsigned int
__rte_ring_do_enqueue_zc_elem_start(struct rte_ring * r,unsigned int esize,uint32_t n,enum rte_ring_queue_behavior behavior,struct rte_ring_zc_data * zcd,unsigned int * free_space)125 __rte_ring_do_enqueue_zc_elem_start(struct rte_ring *r, unsigned int esize,
126 uint32_t n, enum rte_ring_queue_behavior behavior,
127 struct rte_ring_zc_data *zcd, unsigned int *free_space)
128 {
129 uint32_t free, head, next;
130
131 switch (r->prod.sync_type) {
132 case RTE_RING_SYNC_ST:
133 n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
134 behavior, &head, &next, &free);
135 break;
136 case RTE_RING_SYNC_MT_HTS:
137 n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free);
138 break;
139 case RTE_RING_SYNC_MT:
140 case RTE_RING_SYNC_MT_RTS:
141 default:
142 /* unsupported mode, shouldn't be here */
143 RTE_ASSERT(0);
144 n = 0;
145 free = 0;
146 return n;
147 }
148
149 __rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
150 &zcd->n1, &zcd->ptr2);
151
152 if (free_space != NULL)
153 *free_space = free - n;
154 return n;
155 }
156
157 /**
158 * Start to enqueue several objects on the ring.
159 * Note that no actual objects are put in the queue by this function,
160 * it just reserves space for the user on the ring.
161 * User has to copy objects into the queue using the returned pointers.
162 * User should call rte_ring_enqueue_zc_elem_finish to complete the
163 * enqueue operation.
164 *
165 * @param r
166 * A pointer to the ring structure.
167 * @param esize
168 * The size of ring element, in bytes. It must be a multiple of 4.
169 * @param n
170 * The number of objects to add in the ring.
171 * @param zcd
172 * Structure containing the pointers and length of the space
173 * reserved on the ring storage.
174 * @param free_space
175 * If non-NULL, returns the amount of space in the ring after the
176 * reservation operation has finished.
177 * @return
178 * The number of objects that can be enqueued, either 0 or n
179 */
180 __rte_experimental
181 static __rte_always_inline unsigned int
rte_ring_enqueue_zc_bulk_elem_start(struct rte_ring * r,unsigned int esize,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * free_space)182 rte_ring_enqueue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
183 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
184 {
185 return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
186 RTE_RING_QUEUE_FIXED, zcd, free_space);
187 }
188
189 /**
190 * Start to enqueue several pointers to objects on the ring.
191 * Note that no actual pointers are put in the queue by this function,
192 * it just reserves space for the user on the ring.
193 * User has to copy pointers to objects into the queue using the
194 * returned pointers.
195 * User should call rte_ring_enqueue_zc_finish to complete the
196 * enqueue operation.
197 *
198 * @param r
199 * A pointer to the ring structure.
200 * @param n
201 * The number of objects to add in the ring.
202 * @param zcd
203 * Structure containing the pointers and length of the space
204 * reserved on the ring storage.
205 * @param free_space
206 * If non-NULL, returns the amount of space in the ring after the
207 * reservation operation has finished.
208 * @return
209 * The number of objects that can be enqueued, either 0 or n
210 */
211 __rte_experimental
212 static __rte_always_inline unsigned int
rte_ring_enqueue_zc_bulk_start(struct rte_ring * r,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * free_space)213 rte_ring_enqueue_zc_bulk_start(struct rte_ring *r, unsigned int n,
214 struct rte_ring_zc_data *zcd, unsigned int *free_space)
215 {
216 return rte_ring_enqueue_zc_bulk_elem_start(r, sizeof(uintptr_t), n,
217 zcd, free_space);
218 }
219
220 /**
221 * Start to enqueue several objects on the ring.
222 * Note that no actual objects are put in the queue by this function,
223 * it just reserves space for the user on the ring.
224 * User has to copy objects into the queue using the returned pointers.
225 * User should call rte_ring_enqueue_zc_elem_finish to complete the
226 * enqueue operation.
227 *
228 * @param r
229 * A pointer to the ring structure.
230 * @param esize
231 * The size of ring element, in bytes. It must be a multiple of 4.
232 * @param n
233 * The number of objects to add in the ring.
234 * @param zcd
235 * Structure containing the pointers and length of the space
236 * reserved on the ring storage.
237 * @param free_space
238 * If non-NULL, returns the amount of space in the ring after the
239 * reservation operation has finished.
240 * @return
241 * The number of objects that can be enqueued, either 0 or n
242 */
243 __rte_experimental
244 static __rte_always_inline unsigned int
rte_ring_enqueue_zc_burst_elem_start(struct rte_ring * r,unsigned int esize,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * free_space)245 rte_ring_enqueue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
246 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *free_space)
247 {
248 return __rte_ring_do_enqueue_zc_elem_start(r, esize, n,
249 RTE_RING_QUEUE_VARIABLE, zcd, free_space);
250 }
251
252 /**
253 * Start to enqueue several pointers to objects on the ring.
254 * Note that no actual pointers are put in the queue by this function,
255 * it just reserves space for the user on the ring.
256 * User has to copy pointers to objects into the queue using the
257 * returned pointers.
258 * User should call rte_ring_enqueue_zc_finish to complete the
259 * enqueue operation.
260 *
261 * @param r
262 * A pointer to the ring structure.
263 * @param n
264 * The number of objects to add in the ring.
265 * @param zcd
266 * Structure containing the pointers and length of the space
267 * reserved on the ring storage.
268 * @param free_space
269 * If non-NULL, returns the amount of space in the ring after the
270 * reservation operation has finished.
271 * @return
272 * The number of objects that can be enqueued, either 0 or n.
273 */
274 __rte_experimental
275 static __rte_always_inline unsigned int
rte_ring_enqueue_zc_burst_start(struct rte_ring * r,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * free_space)276 rte_ring_enqueue_zc_burst_start(struct rte_ring *r, unsigned int n,
277 struct rte_ring_zc_data *zcd, unsigned int *free_space)
278 {
279 return rte_ring_enqueue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
280 zcd, free_space);
281 }
282
283 /**
284 * Complete enqueuing several objects on the ring.
285 * Note that number of objects to enqueue should not exceed previous
286 * enqueue_start return value.
287 *
288 * @param r
289 * A pointer to the ring structure.
290 * @param n
291 * The number of objects to add to the ring.
292 */
293 __rte_experimental
294 static __rte_always_inline void
rte_ring_enqueue_zc_elem_finish(struct rte_ring * r,unsigned int n)295 rte_ring_enqueue_zc_elem_finish(struct rte_ring *r, unsigned int n)
296 {
297 uint32_t tail;
298
299 switch (r->prod.sync_type) {
300 case RTE_RING_SYNC_ST:
301 n = __rte_ring_st_get_tail(&r->prod, &tail, n);
302 __rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
303 break;
304 case RTE_RING_SYNC_MT_HTS:
305 n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
306 __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
307 break;
308 case RTE_RING_SYNC_MT:
309 case RTE_RING_SYNC_MT_RTS:
310 default:
311 /* unsupported mode, shouldn't be here */
312 RTE_ASSERT(0);
313 }
314 }
315
316 /**
317 * Complete enqueuing several pointers to objects on the ring.
318 * Note that number of objects to enqueue should not exceed previous
319 * enqueue_start return value.
320 *
321 * @param r
322 * A pointer to the ring structure.
323 * @param n
324 * The number of pointers to objects to add to the ring.
325 */
326 __rte_experimental
327 static __rte_always_inline void
rte_ring_enqueue_zc_finish(struct rte_ring * r,unsigned int n)328 rte_ring_enqueue_zc_finish(struct rte_ring *r, unsigned int n)
329 {
330 rte_ring_enqueue_zc_elem_finish(r, n);
331 }
332
333 /**
334 * @internal This function moves cons head value and copies up to *n*
335 * objects from the ring to the user provided obj_table.
336 */
337 static __rte_always_inline unsigned int
__rte_ring_do_dequeue_zc_elem_start(struct rte_ring * r,uint32_t esize,uint32_t n,enum rte_ring_queue_behavior behavior,struct rte_ring_zc_data * zcd,unsigned int * available)338 __rte_ring_do_dequeue_zc_elem_start(struct rte_ring *r,
339 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
340 struct rte_ring_zc_data *zcd, unsigned int *available)
341 {
342 uint32_t avail, head, next;
343
344 switch (r->cons.sync_type) {
345 case RTE_RING_SYNC_ST:
346 n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
347 behavior, &head, &next, &avail);
348 break;
349 case RTE_RING_SYNC_MT_HTS:
350 n = __rte_ring_hts_move_cons_head(r, n, behavior,
351 &head, &avail);
352 break;
353 case RTE_RING_SYNC_MT:
354 case RTE_RING_SYNC_MT_RTS:
355 default:
356 /* unsupported mode, shouldn't be here */
357 RTE_ASSERT(0);
358 n = 0;
359 avail = 0;
360 return n;
361 }
362
363 __rte_ring_get_elem_addr(r, head, esize, n, &zcd->ptr1,
364 &zcd->n1, &zcd->ptr2);
365
366 if (available != NULL)
367 *available = avail - n;
368 return n;
369 }
370
371 /**
372 * Start to dequeue several objects from the ring.
373 * Note that no actual objects are copied from the queue by this function.
374 * User has to copy objects from the queue using the returned pointers.
375 * User should call rte_ring_dequeue_zc_elem_finish to complete the
376 * dequeue operation.
377 *
378 * @param r
379 * A pointer to the ring structure.
380 * @param esize
381 * The size of ring element, in bytes. It must be a multiple of 4.
382 * @param n
383 * The number of objects to remove from the ring.
384 * @param zcd
385 * Structure containing the pointers and length of the space
386 * reserved on the ring storage.
387 * @param available
388 * If non-NULL, returns the number of remaining ring entries after the
389 * dequeue has finished.
390 * @return
391 * The number of objects that can be dequeued, either 0 or n.
392 */
393 __rte_experimental
394 static __rte_always_inline unsigned int
rte_ring_dequeue_zc_bulk_elem_start(struct rte_ring * r,unsigned int esize,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * available)395 rte_ring_dequeue_zc_bulk_elem_start(struct rte_ring *r, unsigned int esize,
396 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
397 {
398 return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
399 RTE_RING_QUEUE_FIXED, zcd, available);
400 }
401
402 /**
403 * Start to dequeue several pointers to objects from the ring.
404 * Note that no actual pointers are removed from the queue by this function.
405 * User has to copy pointers to objects from the queue using the
406 * returned pointers.
407 * User should call rte_ring_dequeue_zc_finish to complete the
408 * dequeue operation.
409 *
410 * @param r
411 * A pointer to the ring structure.
412 * @param n
413 * The number of objects to remove from the ring.
414 * @param zcd
415 * Structure containing the pointers and length of the space
416 * reserved on the ring storage.
417 * @param available
418 * If non-NULL, returns the number of remaining ring entries after the
419 * dequeue has finished.
420 * @return
421 * The number of objects that can be dequeued, either 0 or n.
422 */
423 __rte_experimental
424 static __rte_always_inline unsigned int
rte_ring_dequeue_zc_bulk_start(struct rte_ring * r,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * available)425 rte_ring_dequeue_zc_bulk_start(struct rte_ring *r, unsigned int n,
426 struct rte_ring_zc_data *zcd, unsigned int *available)
427 {
428 return rte_ring_dequeue_zc_bulk_elem_start(r, sizeof(uintptr_t),
429 n, zcd, available);
430 }
431
432 /**
433 * Start to dequeue several objects from the ring.
434 * Note that no actual objects are copied from the queue by this function.
435 * User has to copy objects from the queue using the returned pointers.
436 * User should call rte_ring_dequeue_zc_elem_finish to complete the
437 * dequeue operation.
438 *
439 * @param r
440 * A pointer to the ring structure.
441 * @param esize
442 * The size of ring element, in bytes. It must be a multiple of 4.
443 * This must be the same value used while creating the ring. Otherwise
444 * the results are undefined.
445 * @param n
446 * The number of objects to dequeue from the ring.
447 * @param zcd
448 * Structure containing the pointers and length of the space
449 * reserved on the ring storage.
450 * @param available
451 * If non-NULL, returns the number of remaining ring entries after the
452 * dequeue has finished.
453 * @return
454 * The number of objects that can be dequeued, either 0 or n.
455 */
456 __rte_experimental
457 static __rte_always_inline unsigned int
rte_ring_dequeue_zc_burst_elem_start(struct rte_ring * r,unsigned int esize,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * available)458 rte_ring_dequeue_zc_burst_elem_start(struct rte_ring *r, unsigned int esize,
459 unsigned int n, struct rte_ring_zc_data *zcd, unsigned int *available)
460 {
461 return __rte_ring_do_dequeue_zc_elem_start(r, esize, n,
462 RTE_RING_QUEUE_VARIABLE, zcd, available);
463 }
464
465 /**
466 * Start to dequeue several pointers to objects from the ring.
467 * Note that no actual pointers are removed from the queue by this function.
468 * User has to copy pointers to objects from the queue using the
469 * returned pointers.
470 * User should call rte_ring_dequeue_zc_finish to complete the
471 * dequeue operation.
472 *
473 * @param r
474 * A pointer to the ring structure.
475 * @param n
476 * The number of objects to remove from the ring.
477 * @param zcd
478 * Structure containing the pointers and length of the space
479 * reserved on the ring storage.
480 * @param available
481 * If non-NULL, returns the number of remaining ring entries after the
482 * dequeue has finished.
483 * @return
484 * The number of objects that can be dequeued, either 0 or n.
485 */
486 __rte_experimental
487 static __rte_always_inline unsigned int
rte_ring_dequeue_zc_burst_start(struct rte_ring * r,unsigned int n,struct rte_ring_zc_data * zcd,unsigned int * available)488 rte_ring_dequeue_zc_burst_start(struct rte_ring *r, unsigned int n,
489 struct rte_ring_zc_data *zcd, unsigned int *available)
490 {
491 return rte_ring_dequeue_zc_burst_elem_start(r, sizeof(uintptr_t), n,
492 zcd, available);
493 }
494
495 /**
496 * Complete dequeuing several objects from the ring.
497 * Note that number of objects to dequeued should not exceed previous
498 * dequeue_start return value.
499 *
500 * @param r
501 * A pointer to the ring structure.
502 * @param n
503 * The number of objects to remove from the ring.
504 */
505 __rte_experimental
506 static __rte_always_inline void
rte_ring_dequeue_zc_elem_finish(struct rte_ring * r,unsigned int n)507 rte_ring_dequeue_zc_elem_finish(struct rte_ring *r, unsigned int n)
508 {
509 uint32_t tail;
510
511 switch (r->cons.sync_type) {
512 case RTE_RING_SYNC_ST:
513 n = __rte_ring_st_get_tail(&r->cons, &tail, n);
514 __rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
515 break;
516 case RTE_RING_SYNC_MT_HTS:
517 n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
518 __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
519 break;
520 case RTE_RING_SYNC_MT:
521 case RTE_RING_SYNC_MT_RTS:
522 default:
523 /* unsupported mode, shouldn't be here */
524 RTE_ASSERT(0);
525 }
526 }
527
528 /**
529 * Complete dequeuing several objects from the ring.
530 * Note that number of objects to dequeued should not exceed previous
531 * dequeue_start return value.
532 *
533 * @param r
534 * A pointer to the ring structure.
535 * @param n
536 * The number of objects to remove from the ring.
537 */
538 __rte_experimental
539 static __rte_always_inline void
rte_ring_dequeue_zc_finish(struct rte_ring * r,unsigned int n)540 rte_ring_dequeue_zc_finish(struct rte_ring *r, unsigned int n)
541 {
542 rte_ring_dequeue_elem_finish(r, n);
543 }
544
545 #ifdef __cplusplus
546 }
547 #endif
548
549 #endif /* _RTE_RING_PEEK_ZC_H_ */
550