1 /*
2  *-
3  *   BSD LICENSE
4  *
5  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 /*
36  * Some portions of this software is derived from the producer
37  * consumer queues described by Dmitry Vyukov and published  here
38  * http://www.1024cores.net
39  *
40  * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41  * Redistribution and use in source and binary forms, with or without
42  * modification, are permitted provided that the following conditions
43  * are met:
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  * this list of conditions and the following disclaimer.
47  *
48  * 2. Redistributions in binary form must reproduce the above copyright notice,
49  * this list of conditions and the following disclaimer in the documentation
50  * and/or other materials provided with the distribution.
51  *
52  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
63  *
64  * The views and conclusions contained in the software and documentation are
65  * those of the authors and should not be interpreted as representing official
66  * policies, either expressed or implied, of Dmitry Vyukov.
67  */
68 
69 #ifndef LTHREAD_QUEUE_H_
70 #define LTHREAD_QUEUE_H_
71 
72 #ifdef __cplusplus
73 extern "C" {
74 #endif
75 
76 #include <string.h>
77 
78 #include <rte_prefetch.h>
79 #include <rte_per_lcore.h>
80 
81 #include "lthread_int.h"
82 #include "lthread.h"
83 #include "lthread_diag.h"
84 #include "lthread_pool.h"
85 
86 struct lthread_queue;
87 
88 /*
89  * This file implements an unbounded FIFO queue based on a lock free
90  * linked list.
91  *
92  * The queue is non-intrusive in that it uses intermediate nodes, and does
93  * not require these nodes to be inserted into the object being placed
94  * in the queue.
95  *
96  * This is slightly more efficient than the very similar queue in lthread_pool
97  * in that it does not have to swing a stub node as the queue becomes empty.
98  *
99  * The queue access functions allocate and free intermediate node
100  * transparently from/to a per scheduler pool ( see lthread_pool.h ).
101  *
102  * The queue provides both MPSC and SPSC insert methods
103  */
104 
105 /*
106  * define a queue of lthread nodes
107  */
108 struct lthread_queue {
109 	struct qnode *head;
110 	struct qnode *tail __rte_cache_aligned;
111 	struct lthread_queue *p;
112 	char name[LT_MAX_NAME_SIZE];
113 
114 	DIAG_COUNT_DEFINE(rd);
115 	DIAG_COUNT_DEFINE(wr);
116 	DIAG_COUNT_DEFINE(size);
117 
118 } __rte_cache_aligned;
119 
120 
121 
122 static inline struct lthread_queue *
123 _lthread_queue_create(const char *name)
124 {
125 	struct qnode *stub;
126 	struct lthread_queue *new_queue;
127 
128 	new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
129 					RTE_CACHE_LINE_SIZE,
130 					rte_socket_id());
131 	if (new_queue == NULL)
132 		return NULL;
133 
134 	/* allocated stub node */
135 	stub = _qnode_alloc();
136 	RTE_ASSERT(stub);
137 
138 	if (name != NULL)
139 		strncpy(new_queue->name, name, sizeof(new_queue->name));
140 	new_queue->name[sizeof(new_queue->name)-1] = 0;
141 
142 	/* initialize queue as empty */
143 	stub->next = NULL;
144 	new_queue->head = stub;
145 	new_queue->tail = stub;
146 
147 	DIAG_COUNT_INIT(new_queue, rd);
148 	DIAG_COUNT_INIT(new_queue, wr);
149 	DIAG_COUNT_INIT(new_queue, size);
150 
151 	return new_queue;
152 }
153 
154 /**
155  * Return true if the queue is empty
156  */
157 static __rte_always_inline int
158 _lthread_queue_empty(struct lthread_queue *q)
159 {
160 	return q->tail == q->head;
161 }
162 
163 
164 
165 /**
166  * Destroy a queue
167  * fail if queue is not empty
168  */
169 static inline int _lthread_queue_destroy(struct lthread_queue *q)
170 {
171 	if (q == NULL)
172 		return -1;
173 
174 	if (!_lthread_queue_empty(q))
175 		return -1;
176 
177 	_qnode_free(q->head);
178 	rte_free(q);
179 	return 0;
180 }
181 
182 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
183 
184 /*
185  * Insert a node into a queue
186  * this implementation is multi producer safe
187  */
188 static __rte_always_inline struct qnode *
189 _lthread_queue_insert_mp(struct lthread_queue
190 							  *q, void *data)
191 {
192 	struct qnode *prev;
193 	struct qnode *n = _qnode_alloc();
194 
195 	if (n == NULL)
196 		return NULL;
197 
198 	/* set object in node */
199 	n->data = data;
200 	n->next = NULL;
201 
202 	/* this is an MPSC method, perform a locked update */
203 	prev = n;
204 	prev =
205 	    (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
206 					       (uint64_t) prev);
207 	/* there is a window of inconsistency until prev next is set,
208 	 * which is why remove must retry
209 	 */
210 	prev->next = n;
211 
212 	DIAG_COUNT_INC(q, wr);
213 	DIAG_COUNT_INC(q, size);
214 
215 	return n;
216 }
217 
218 /*
219  * Insert an node into a queue in single producer mode
220  * this implementation is NOT mult producer safe
221  */
222 static __rte_always_inline struct qnode *
223 _lthread_queue_insert_sp(struct lthread_queue
224 							  *q, void *data)
225 {
226 	/* allocate a queue node */
227 	struct qnode *prev;
228 	struct qnode *n = _qnode_alloc();
229 
230 	if (n == NULL)
231 		return NULL;
232 
233 	/* set data in node */
234 	n->data = data;
235 	n->next = NULL;
236 
237 	/* this is an SPSC method, no need for locked exchange operation */
238 	prev = q->head;
239 	prev->next = q->head = n;
240 
241 	DIAG_COUNT_INC(q, wr);
242 	DIAG_COUNT_INC(q, size);
243 
244 	return n;
245 }
246 
247 /*
248  * Remove a node from a queue
249  */
250 static __rte_always_inline void *
251 _lthread_queue_poll(struct lthread_queue *q)
252 {
253 	void *data = NULL;
254 	struct qnode *tail = q->tail;
255 	struct qnode *next = (struct qnode *)tail->next;
256 	/*
257 	 * There is a small window of inconsistency between producer and
258 	 * consumer whereby the queue may appear empty if consumer and
259 	 * producer access it at the same time.
260 	 * The consumer must handle this by retrying
261 	 */
262 
263 	if (likely(next != NULL)) {
264 		q->tail = next;
265 		tail->data = next->data;
266 		data = tail->data;
267 
268 		/* free the node */
269 		_qnode_free(tail);
270 
271 		DIAG_COUNT_INC(q, rd);
272 		DIAG_COUNT_DEC(q, size);
273 		return data;
274 	}
275 	return NULL;
276 }
277 
278 /*
279  * Remove a node from a queue
280  */
281 static __rte_always_inline void *
282 _lthread_queue_remove(struct lthread_queue *q)
283 {
284 	void *data = NULL;
285 
286 	/*
287 	 * There is a small window of inconsistency between producer and
288 	 * consumer whereby the queue may appear empty if consumer and
289 	 * producer access it at the same time. We handle this by retrying
290 	 */
291 	do {
292 		data = _lthread_queue_poll(q);
293 
294 		if (likely(data != NULL)) {
295 
296 			DIAG_COUNT_INC(q, rd);
297 			DIAG_COUNT_DEC(q, size);
298 			return data;
299 		}
300 		rte_compiler_barrier();
301 	} while (unlikely(!_lthread_queue_empty(q)));
302 	return NULL;
303 }
304 
305 #ifdef __cplusplus
306 }
307 #endif
308 
309 #endif				/* LTHREAD_QUEUE_H_ */
310