1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy [email protected]
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_PEEK_H_
11 #define _RTE_RING_PEEK_H_
12 
13 /**
14  * @file
15  * @b EXPERIMENTAL: this API may change without prior notice
16  * It is not recommended to include this file directly.
17  * Please include <rte_ring_elem.h> instead.
18  *
19  * Ring Peek API
20  * Introduction of rte_ring with serialized producer/consumer (HTS sync mode)
21  * makes possible to split public enqueue/dequeue API into two phases:
22  * - enqueue/dequeue start
23  * - enqueue/dequeue finish
24  * That allows user to inspect objects in the ring without removing them
25  * from it (aka MT safe peek).
26  * Note that right now this new API is available only for two sync modes:
27  * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST)
28  * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS).
29  * It is a user responsibility to create/init ring with appropriate sync
30  * modes selected.
31  * As an example:
32  * // read 1 elem from the ring:
33  * n = rte_ring_dequeue_bulk_start(ring, &obj, 1, NULL);
34  * if (n != 0) {
35  *    //examine object
36  *    if (object_examine(obj) == KEEP)
37  *       //decided to keep it in the ring.
38  *       rte_ring_dequeue_finish(ring, 0);
39  *    else
40  *       //decided to remove it from the ring.
41  *       rte_ring_dequeue_finish(ring, n);
42  * }
43  * Note that between _start_ and _finish_ none other thread can proceed
44  * with enqueue(/dequeue) operation till _finish_ completes.
45  */
46 
47 #ifdef __cplusplus
48 extern "C" {
49 #endif
50 
51 #include <rte_ring_peek_c11_mem.h>
52 
53 /**
54  * @internal This function moves prod head value.
55  */
56 static __rte_always_inline unsigned int
__rte_ring_do_enqueue_start(struct rte_ring * r,uint32_t n,enum rte_ring_queue_behavior behavior,uint32_t * free_space)57 __rte_ring_do_enqueue_start(struct rte_ring *r, uint32_t n,
58 		enum rte_ring_queue_behavior behavior, uint32_t *free_space)
59 {
60 	uint32_t free, head, next;
61 
62 	switch (r->prod.sync_type) {
63 	case RTE_RING_SYNC_ST:
64 		n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n,
65 			behavior, &head, &next, &free);
66 		break;
67 	case RTE_RING_SYNC_MT_HTS:
68 		n =  __rte_ring_hts_move_prod_head(r, n, behavior,
69 			&head, &free);
70 		break;
71 	case RTE_RING_SYNC_MT:
72 	case RTE_RING_SYNC_MT_RTS:
73 	default:
74 		/* unsupported mode, shouldn't be here */
75 		RTE_ASSERT(0);
76 		n = 0;
77 		free = 0;
78 	}
79 
80 	if (free_space != NULL)
81 		*free_space = free - n;
82 	return n;
83 }
84 
85 /**
86  * Start to enqueue several objects on the ring.
87  * Note that no actual objects are put in the queue by this function,
88  * it just reserves for user such ability.
89  * User has to call appropriate enqueue_elem_finish() to copy objects into the
90  * queue and complete given enqueue operation.
91  *
92  * @param r
93  *   A pointer to the ring structure.
94  * @param n
95  *   The number of objects to add in the ring from the obj_table.
96  * @param free_space
97  *   if non-NULL, returns the amount of space in the ring after the
98  *   enqueue operation has finished.
99  * @return
100  *   The number of objects that can be enqueued, either 0 or n
101  */
102 __rte_experimental
103 static __rte_always_inline unsigned int
rte_ring_enqueue_bulk_elem_start(struct rte_ring * r,unsigned int n,unsigned int * free_space)104 rte_ring_enqueue_bulk_elem_start(struct rte_ring *r, unsigned int n,
105 		unsigned int *free_space)
106 {
107 	return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_FIXED,
108 			free_space);
109 }
110 
111 /**
112  * Start to enqueue several objects on the ring.
113  * Note that no actual objects are put in the queue by this function,
114  * it just reserves for user such ability.
115  * User has to call appropriate enqueue_finish() to copy objects into the
116  * queue and complete given enqueue operation.
117  *
118  * @param r
119  *   A pointer to the ring structure.
120  * @param n
121  *   The number of objects to add in the ring from the obj_table.
122  * @param free_space
123  *   if non-NULL, returns the amount of space in the ring after the
124  *   enqueue operation has finished.
125  * @return
126  *   The number of objects that can be enqueued, either 0 or n
127  */
128 __rte_experimental
129 static __rte_always_inline unsigned int
rte_ring_enqueue_bulk_start(struct rte_ring * r,unsigned int n,unsigned int * free_space)130 rte_ring_enqueue_bulk_start(struct rte_ring *r, unsigned int n,
131 		unsigned int *free_space)
132 {
133 	return rte_ring_enqueue_bulk_elem_start(r, n, free_space);
134 }
135 
136 /**
137  * Start to enqueue several objects on the ring.
138  * Note that no actual objects are put in the queue by this function,
139  * it just reserves for user such ability.
140  * User has to call appropriate enqueue_elem_finish() to copy objects into the
141  * queue and complete given enqueue operation.
142  *
143  * @param r
144  *   A pointer to the ring structure.
145  * @param n
146  *   The number of objects to add in the ring from the obj_table.
147  * @param free_space
148  *   if non-NULL, returns the amount of space in the ring after the
149  *   enqueue operation has finished.
150  * @return
151  *   Actual number of objects that can be enqueued.
152  */
153 __rte_experimental
154 static __rte_always_inline unsigned int
rte_ring_enqueue_burst_elem_start(struct rte_ring * r,unsigned int n,unsigned int * free_space)155 rte_ring_enqueue_burst_elem_start(struct rte_ring *r, unsigned int n,
156 		unsigned int *free_space)
157 {
158 	return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_VARIABLE,
159 			free_space);
160 }
161 
162 /**
163  * Start to enqueue several objects on the ring.
164  * Note that no actual objects are put in the queue by this function,
165  * it just reserves for user such ability.
166  * User has to call appropriate enqueue_finish() to copy objects into the
167  * queue and complete given enqueue operation.
168  *
169  * @param r
170  *   A pointer to the ring structure.
171  * @param n
172  *   The number of objects to add in the ring from the obj_table.
173  * @param free_space
174  *   if non-NULL, returns the amount of space in the ring after the
175  *   enqueue operation has finished.
176  * @return
177  *   Actual number of objects that can be enqueued.
178  */
179 __rte_experimental
180 static __rte_always_inline unsigned int
rte_ring_enqueue_burst_start(struct rte_ring * r,unsigned int n,unsigned int * free_space)181 rte_ring_enqueue_burst_start(struct rte_ring *r, unsigned int n,
182 		unsigned int *free_space)
183 {
184 	return rte_ring_enqueue_burst_elem_start(r, n, free_space);
185 }
186 
187 /**
188  * Complete to enqueue several objects on the ring.
189  * Note that number of objects to enqueue should not exceed previous
190  * enqueue_start return value.
191  *
192  * @param r
193  *   A pointer to the ring structure.
194  * @param obj_table
195  *   A pointer to a table of objects.
196  * @param esize
197  *   The size of ring element, in bytes. It must be a multiple of 4.
198  *   This must be the same value used while creating the ring. Otherwise
199  *   the results are undefined.
200  * @param n
201  *   The number of objects to add to the ring from the obj_table.
202  */
203 __rte_experimental
204 static __rte_always_inline void
rte_ring_enqueue_elem_finish(struct rte_ring * r,const void * obj_table,unsigned int esize,unsigned int n)205 rte_ring_enqueue_elem_finish(struct rte_ring *r, const void *obj_table,
206 		unsigned int esize, unsigned int n)
207 {
208 	uint32_t tail;
209 
210 	switch (r->prod.sync_type) {
211 	case RTE_RING_SYNC_ST:
212 		n = __rte_ring_st_get_tail(&r->prod, &tail, n);
213 		if (n != 0)
214 			__rte_ring_enqueue_elems(r, tail, obj_table, esize, n);
215 		__rte_ring_st_set_head_tail(&r->prod, tail, n, 1);
216 		break;
217 	case RTE_RING_SYNC_MT_HTS:
218 		n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n);
219 		if (n != 0)
220 			__rte_ring_enqueue_elems(r, tail, obj_table, esize, n);
221 		__rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1);
222 		break;
223 	case RTE_RING_SYNC_MT:
224 	case RTE_RING_SYNC_MT_RTS:
225 	default:
226 		/* unsupported mode, shouldn't be here */
227 		RTE_ASSERT(0);
228 	}
229 }
230 
231 /**
232  * Complete to enqueue several objects on the ring.
233  * Note that number of objects to enqueue should not exceed previous
234  * enqueue_start return value.
235  *
236  * @param r
237  *   A pointer to the ring structure.
238  * @param obj_table
239  *   A pointer to a table of objects.
240  * @param n
241  *   The number of objects to add to the ring from the obj_table.
242  */
243 __rte_experimental
244 static __rte_always_inline void
rte_ring_enqueue_finish(struct rte_ring * r,void * const * obj_table,unsigned int n)245 rte_ring_enqueue_finish(struct rte_ring *r, void * const *obj_table,
246 		unsigned int n)
247 {
248 	rte_ring_enqueue_elem_finish(r, obj_table, sizeof(uintptr_t), n);
249 }
250 
251 /**
252  * @internal This function moves cons head value and copies up to *n*
253  * objects from the ring to the user provided obj_table.
254  */
255 static __rte_always_inline unsigned int
__rte_ring_do_dequeue_start(struct rte_ring * r,void * obj_table,uint32_t esize,uint32_t n,enum rte_ring_queue_behavior behavior,uint32_t * available)256 __rte_ring_do_dequeue_start(struct rte_ring *r, void *obj_table,
257 	uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
258 	uint32_t *available)
259 {
260 	uint32_t avail, head, next;
261 
262 	switch (r->cons.sync_type) {
263 	case RTE_RING_SYNC_ST:
264 		n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n,
265 			behavior, &head, &next, &avail);
266 		break;
267 	case RTE_RING_SYNC_MT_HTS:
268 		n =  __rte_ring_hts_move_cons_head(r, n, behavior,
269 			&head, &avail);
270 		break;
271 	case RTE_RING_SYNC_MT:
272 	case RTE_RING_SYNC_MT_RTS:
273 	default:
274 		/* unsupported mode, shouldn't be here */
275 		RTE_ASSERT(0);
276 		n = 0;
277 		avail = 0;
278 	}
279 
280 	if (n != 0)
281 		__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
282 
283 	if (available != NULL)
284 		*available = avail - n;
285 	return n;
286 }
287 
288 /**
289  * Start to dequeue several objects from the ring.
290  * Note that user has to call appropriate dequeue_finish()
291  * to complete given dequeue operation and actually remove objects the ring.
292  *
293  * @param r
294  *   A pointer to the ring structure.
295  * @param obj_table
296  *   A pointer to a table of objects that will be filled.
297  * @param esize
298  *   The size of ring element, in bytes. It must be a multiple of 4.
299  *   This must be the same value used while creating the ring. Otherwise
300  *   the results are undefined.
301  * @param n
302  *   The number of objects to dequeue from the ring to the obj_table.
303  * @param available
304  *   If non-NULL, returns the number of remaining ring entries after the
305  *   dequeue has finished.
306  * @return
307  *   The number of objects dequeued, either 0 or n.
308  */
309 __rte_experimental
310 static __rte_always_inline unsigned int
rte_ring_dequeue_bulk_elem_start(struct rte_ring * r,void * obj_table,unsigned int esize,unsigned int n,unsigned int * available)311 rte_ring_dequeue_bulk_elem_start(struct rte_ring *r, void *obj_table,
312 		unsigned int esize, unsigned int n, unsigned int *available)
313 {
314 	return __rte_ring_do_dequeue_start(r, obj_table, esize, n,
315 			RTE_RING_QUEUE_FIXED, available);
316 }
317 
318 /**
319  * Start to dequeue several objects from the ring.
320  * Note that user has to call appropriate dequeue_finish()
321  * to complete given dequeue operation and actually remove objects the ring.
322  *
323  * @param r
324  *   A pointer to the ring structure.
325  * @param obj_table
326  *   A pointer to a table of void * pointers (objects) that will be filled.
327  * @param n
328  *   The number of objects to dequeue from the ring to the obj_table.
329  * @param available
330  *   If non-NULL, returns the number of remaining ring entries after the
331  *   dequeue has finished.
332  * @return
333  *   Actual number of objects dequeued.
334  */
335 __rte_experimental
336 static __rte_always_inline unsigned int
rte_ring_dequeue_bulk_start(struct rte_ring * r,void ** obj_table,unsigned int n,unsigned int * available)337 rte_ring_dequeue_bulk_start(struct rte_ring *r, void **obj_table,
338 		unsigned int n, unsigned int *available)
339 {
340 	return rte_ring_dequeue_bulk_elem_start(r, obj_table, sizeof(uintptr_t),
341 		n, available);
342 }
343 
344 /**
345  * Start to dequeue several objects from the ring.
346  * Note that user has to call appropriate dequeue_finish()
347  * to complete given dequeue operation and actually remove objects the ring.
348  *
349  * @param r
350  *   A pointer to the ring structure.
351  * @param obj_table
352  *   A pointer to a table of objects that will be filled.
353  * @param esize
354  *   The size of ring element, in bytes. It must be a multiple of 4.
355  *   This must be the same value used while creating the ring. Otherwise
356  *   the results are undefined.
357  * @param n
358  *   The number of objects to dequeue from the ring to the obj_table.
359  * @param available
360  *   If non-NULL, returns the number of remaining ring entries after the
361  *   dequeue has finished.
362  * @return
363  *   The actual number of objects dequeued.
364  */
365 __rte_experimental
366 static __rte_always_inline unsigned int
rte_ring_dequeue_burst_elem_start(struct rte_ring * r,void * obj_table,unsigned int esize,unsigned int n,unsigned int * available)367 rte_ring_dequeue_burst_elem_start(struct rte_ring *r, void *obj_table,
368 		unsigned int esize, unsigned int n, unsigned int *available)
369 {
370 	return __rte_ring_do_dequeue_start(r, obj_table, esize, n,
371 			RTE_RING_QUEUE_VARIABLE, available);
372 }
373 
374 /**
375  * Start to dequeue several objects from the ring.
376  * Note that user has to call appropriate dequeue_finish()
377  * to complete given dequeue operation and actually remove objects the ring.
378  *
379  * @param r
380  *   A pointer to the ring structure.
381  * @param obj_table
382  *   A pointer to a table of void * pointers (objects) that will be filled.
383  * @param n
384  *   The number of objects to dequeue from the ring to the obj_table.
385  * @param available
386  *   If non-NULL, returns the number of remaining ring entries after the
387  *   dequeue has finished.
388  * @return
389  *   The actual number of objects dequeued.
390  */
391 __rte_experimental
392 static __rte_always_inline unsigned int
rte_ring_dequeue_burst_start(struct rte_ring * r,void ** obj_table,unsigned int n,unsigned int * available)393 rte_ring_dequeue_burst_start(struct rte_ring *r, void **obj_table,
394 		unsigned int n, unsigned int *available)
395 {
396 	return rte_ring_dequeue_burst_elem_start(r, obj_table,
397 		sizeof(uintptr_t), n, available);
398 }
399 
400 /**
401  * Complete to dequeue several objects from the ring.
402  * Note that number of objects to dequeue should not exceed previous
403  * dequeue_start return value.
404  *
405  * @param r
406  *   A pointer to the ring structure.
407  * @param n
408  *   The number of objects to remove from the ring.
409  */
410 __rte_experimental
411 static __rte_always_inline void
rte_ring_dequeue_elem_finish(struct rte_ring * r,unsigned int n)412 rte_ring_dequeue_elem_finish(struct rte_ring *r, unsigned int n)
413 {
414 	uint32_t tail;
415 
416 	switch (r->cons.sync_type) {
417 	case RTE_RING_SYNC_ST:
418 		n = __rte_ring_st_get_tail(&r->cons, &tail, n);
419 		__rte_ring_st_set_head_tail(&r->cons, tail, n, 0);
420 		break;
421 	case RTE_RING_SYNC_MT_HTS:
422 		n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n);
423 		__rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0);
424 		break;
425 	case RTE_RING_SYNC_MT:
426 	case RTE_RING_SYNC_MT_RTS:
427 	default:
428 		/* unsupported mode, shouldn't be here */
429 		RTE_ASSERT(0);
430 	}
431 }
432 
433 /**
434  * Complete to dequeue several objects from the ring.
435  * Note that number of objects to dequeue should not exceed previous
436  * dequeue_start return value.
437  *
438  * @param r
439  *   A pointer to the ring structure.
440  * @param n
441  *   The number of objects to remove from the ring.
442  */
443 __rte_experimental
444 static __rte_always_inline void
rte_ring_dequeue_finish(struct rte_ring * r,unsigned int n)445 rte_ring_dequeue_finish(struct rte_ring *r, unsigned int n)
446 {
447 	rte_ring_dequeue_elem_finish(r, n);
448 }
449 
450 #ifdef __cplusplus
451 }
452 #endif
453 
454 #endif /* _RTE_RING_PEEK_H_ */
455