1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017,2018 HXT-semitech Corporation.
4  * Copyright (c) 2007-2009 Kip Macy [email protected]
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_C11_MEM_H_
11 #define _RTE_RING_C11_MEM_H_
12 
13 static __rte_always_inline void
update_tail(struct rte_ring_headtail * ht,uint32_t old_val,uint32_t new_val,uint32_t single,uint32_t enqueue)14 update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val,
15 		uint32_t single, uint32_t enqueue)
16 {
17 	RTE_SET_USED(enqueue);
18 
19 	/*
20 	 * If there are other enqueues/dequeues in progress that preceded us,
21 	 * we need to wait for them to complete
22 	 */
23 	if (!single)
24 		while (unlikely(ht->tail != old_val))
25 			rte_pause();
26 
27 	__atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
28 }
29 
30 /**
31  * @internal This function updates the producer head for enqueue
32  *
33  * @param r
34  *   A pointer to the ring structure
35  * @param is_sp
36  *   Indicates whether multi-producer path is needed or not
37  * @param n
38  *   The number of elements we will want to enqueue, i.e. how far should the
39  *   head be moved
40  * @param behavior
41  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
42  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
43  * @param old_head
44  *   Returns head value as it was before the move, i.e. where enqueue starts
45  * @param new_head
46  *   Returns the current/new head value i.e. where enqueue finishes
47  * @param free_entries
48  *   Returns the amount of free space in the ring BEFORE head was moved
49  * @return
50  *   Actual number of objects enqueued.
51  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
52  */
53 static __rte_always_inline unsigned int
__rte_ring_move_prod_head(struct rte_ring * r,unsigned int is_sp,unsigned int n,enum rte_ring_queue_behavior behavior,uint32_t * old_head,uint32_t * new_head,uint32_t * free_entries)54 __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
55 		unsigned int n, enum rte_ring_queue_behavior behavior,
56 		uint32_t *old_head, uint32_t *new_head,
57 		uint32_t *free_entries)
58 {
59 	const uint32_t capacity = r->capacity;
60 	uint32_t cons_tail;
61 	unsigned int max = n;
62 	int success;
63 
64 	*old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED);
65 	do {
66 		/* Reset n to the initial burst count */
67 		n = max;
68 
69 		/* Ensure the head is read before tail */
70 		__atomic_thread_fence(__ATOMIC_ACQUIRE);
71 
72 		/* load-acquire synchronize with store-release of ht->tail
73 		 * in update_tail.
74 		 */
75 		cons_tail = __atomic_load_n(&r->cons.tail,
76 					__ATOMIC_ACQUIRE);
77 
78 		/* The subtraction is done between two unsigned 32bits value
79 		 * (the result is always modulo 32 bits even if we have
80 		 * *old_head > cons_tail). So 'free_entries' is always between 0
81 		 * and capacity (which is < size).
82 		 */
83 		*free_entries = (capacity + cons_tail - *old_head);
84 
85 		/* check that we have enough room in ring */
86 		if (unlikely(n > *free_entries))
87 			n = (behavior == RTE_RING_QUEUE_FIXED) ?
88 					0 : *free_entries;
89 
90 		if (n == 0)
91 			return 0;
92 
93 		*new_head = *old_head + n;
94 		if (is_sp)
95 			r->prod.head = *new_head, success = 1;
96 		else
97 			/* on failure, *old_head is updated */
98 			success = __atomic_compare_exchange_n(&r->prod.head,
99 					old_head, *new_head,
100 					0, __ATOMIC_RELAXED,
101 					__ATOMIC_RELAXED);
102 	} while (unlikely(success == 0));
103 	return n;
104 }
105 
106 /**
107  * @internal This function updates the consumer head for dequeue
108  *
109  * @param r
110  *   A pointer to the ring structure
111  * @param is_sc
112  *   Indicates whether multi-consumer path is needed or not
113  * @param n
114  *   The number of elements we will want to enqueue, i.e. how far should the
115  *   head be moved
116  * @param behavior
117  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
118  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
119  * @param old_head
120  *   Returns head value as it was before the move, i.e. where dequeue starts
121  * @param new_head
122  *   Returns the current/new head value i.e. where dequeue finishes
123  * @param entries
124  *   Returns the number of entries in the ring BEFORE head was moved
125  * @return
126  *   - Actual number of objects dequeued.
127  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
128  */
129 static __rte_always_inline unsigned int
__rte_ring_move_cons_head(struct rte_ring * r,int is_sc,unsigned int n,enum rte_ring_queue_behavior behavior,uint32_t * old_head,uint32_t * new_head,uint32_t * entries)130 __rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
131 		unsigned int n, enum rte_ring_queue_behavior behavior,
132 		uint32_t *old_head, uint32_t *new_head,
133 		uint32_t *entries)
134 {
135 	unsigned int max = n;
136 	uint32_t prod_tail;
137 	int success;
138 
139 	/* move cons.head atomically */
140 	*old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED);
141 	do {
142 		/* Restore n as it may change every loop */
143 		n = max;
144 
145 		/* Ensure the head is read before tail */
146 		__atomic_thread_fence(__ATOMIC_ACQUIRE);
147 
148 		/* this load-acquire synchronize with store-release of ht->tail
149 		 * in update_tail.
150 		 */
151 		prod_tail = __atomic_load_n(&r->prod.tail,
152 					__ATOMIC_ACQUIRE);
153 
154 		/* The subtraction is done between two unsigned 32bits value
155 		 * (the result is always modulo 32 bits even if we have
156 		 * cons_head > prod_tail). So 'entries' is always between 0
157 		 * and size(ring)-1.
158 		 */
159 		*entries = (prod_tail - *old_head);
160 
161 		/* Set the actual entries for dequeue */
162 		if (n > *entries)
163 			n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
164 
165 		if (unlikely(n == 0))
166 			return 0;
167 
168 		*new_head = *old_head + n;
169 		if (is_sc)
170 			r->cons.head = *new_head, success = 1;
171 		else
172 			/* on failure, *old_head will be updated */
173 			success = __atomic_compare_exchange_n(&r->cons.head,
174 							old_head, *new_head,
175 							0, __ATOMIC_RELAXED,
176 							__ATOMIC_RELAXED);
177 	} while (unlikely(success == 0));
178 	return n;
179 }
180 
181 #endif /* _RTE_RING_C11_MEM_H_ */
182