1*a9643ea8Slogwang /*
2*a9643ea8Slogwang  *-
3*a9643ea8Slogwang  *   BSD LICENSE
4*a9643ea8Slogwang  *
5*a9643ea8Slogwang  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
6*a9643ea8Slogwang  *   All rights reserved.
7*a9643ea8Slogwang  *
8*a9643ea8Slogwang  *   Redistribution and use in source and binary forms, with or without
9*a9643ea8Slogwang  *   modification, are permitted provided that the following conditions
10*a9643ea8Slogwang  *   are met:
11*a9643ea8Slogwang  *
12*a9643ea8Slogwang  *     * Redistributions of source code must retain the above copyright
13*a9643ea8Slogwang  *       notice, this list of conditions and the following disclaimer.
14*a9643ea8Slogwang  *     * Redistributions in binary form must reproduce the above copyright
15*a9643ea8Slogwang  *       notice, this list of conditions and the following disclaimer in
16*a9643ea8Slogwang  *       the documentation and/or other materials provided with the
17*a9643ea8Slogwang  *       distribution.
18*a9643ea8Slogwang  *     * Neither the name of Intel Corporation nor the names of its
19*a9643ea8Slogwang  *       contributors may be used to endorse or promote products derived
20*a9643ea8Slogwang  *       from this software without specific prior written permission.
21*a9643ea8Slogwang  *
22*a9643ea8Slogwang  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23*a9643ea8Slogwang  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24*a9643ea8Slogwang  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25*a9643ea8Slogwang  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26*a9643ea8Slogwang  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27*a9643ea8Slogwang  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28*a9643ea8Slogwang  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29*a9643ea8Slogwang  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30*a9643ea8Slogwang  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31*a9643ea8Slogwang  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32*a9643ea8Slogwang  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33*a9643ea8Slogwang  */
34*a9643ea8Slogwang 
35*a9643ea8Slogwang /*
36*a9643ea8Slogwang  * Some portions of this software is derived from the producer
37*a9643ea8Slogwang  * consumer queues described by Dmitry Vyukov and published  here
38*a9643ea8Slogwang  * http://www.1024cores.net
39*a9643ea8Slogwang  *
40*a9643ea8Slogwang  * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41*a9643ea8Slogwang  * Redistribution and use in source and binary forms, with or without
42*a9643ea8Slogwang  * modification, are permitted provided that the following conditions
43*a9643ea8Slogwang  * are met:
44*a9643ea8Slogwang  *
45*a9643ea8Slogwang  * 1. Redistributions of source code must retain the above copyright notice,
46*a9643ea8Slogwang  * this list of conditions and the following disclaimer.
47*a9643ea8Slogwang  *
48*a9643ea8Slogwang  * 2. Redistributions in binary form must reproduce the above copyright notice,
49*a9643ea8Slogwang  * this list of conditions and the following disclaimer in the documentation
50*a9643ea8Slogwang  * and/or other materials provided with the distribution.
51*a9643ea8Slogwang  *
52*a9643ea8Slogwang  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53*a9643ea8Slogwang  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54*a9643ea8Slogwang  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55*a9643ea8Slogwang  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56*a9643ea8Slogwang  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57*a9643ea8Slogwang  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58*a9643ea8Slogwang  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59*a9643ea8Slogwang  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60*a9643ea8Slogwang  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61*a9643ea8Slogwang  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62*a9643ea8Slogwang  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
63*a9643ea8Slogwang  *
64*a9643ea8Slogwang  * The views and conclusions contained in the software and documentation are
65*a9643ea8Slogwang  * those of the authors and should not be interpreted as representing official
66*a9643ea8Slogwang  * policies, either expressed or implied, of Dmitry Vyukov.
67*a9643ea8Slogwang  */
68*a9643ea8Slogwang 
69*a9643ea8Slogwang #ifndef LTHREAD_QUEUE_H_
70*a9643ea8Slogwang #define LTHREAD_QUEUE_H_
71*a9643ea8Slogwang 
72*a9643ea8Slogwang #include <string.h>
73*a9643ea8Slogwang 
74*a9643ea8Slogwang #include <rte_prefetch.h>
75*a9643ea8Slogwang #include <rte_per_lcore.h>
76*a9643ea8Slogwang 
77*a9643ea8Slogwang #include "lthread_int.h"
78*a9643ea8Slogwang #include "lthread.h"
79*a9643ea8Slogwang #include "lthread_diag.h"
80*a9643ea8Slogwang #include "lthread_pool.h"
81*a9643ea8Slogwang 
82*a9643ea8Slogwang struct lthread_queue;
83*a9643ea8Slogwang 
84*a9643ea8Slogwang /*
85*a9643ea8Slogwang  * This file implements an unbounded FIFO queue based on a lock free
86*a9643ea8Slogwang  * linked list.
87*a9643ea8Slogwang  *
88*a9643ea8Slogwang  * The queue is non-intrusive in that it uses intermediate nodes, and does
89*a9643ea8Slogwang  * not require these nodes to be inserted into the object being placed
90*a9643ea8Slogwang  * in the queue.
91*a9643ea8Slogwang  *
92*a9643ea8Slogwang  * This is slightly more efficient than the very similar queue in lthread_pool
93*a9643ea8Slogwang  * in that it does not have to swing a stub node as the queue becomes empty.
94*a9643ea8Slogwang  *
95*a9643ea8Slogwang  * The queue access functions allocate and free intermediate node
96*a9643ea8Slogwang  * transparently from/to a per scheduler pool ( see lthread_pool.h ).
97*a9643ea8Slogwang  *
98*a9643ea8Slogwang  * The queue provides both MPSC and SPSC insert methods
99*a9643ea8Slogwang  */
100*a9643ea8Slogwang 
101*a9643ea8Slogwang /*
102*a9643ea8Slogwang  * define a queue of lthread nodes
103*a9643ea8Slogwang  */
104*a9643ea8Slogwang struct lthread_queue {
105*a9643ea8Slogwang 	struct qnode *head;
106*a9643ea8Slogwang 	struct qnode *tail __rte_cache_aligned;
107*a9643ea8Slogwang 	struct lthread_queue *p;
108*a9643ea8Slogwang 	char name[LT_MAX_NAME_SIZE];
109*a9643ea8Slogwang 
110*a9643ea8Slogwang 	DIAG_COUNT_DEFINE(rd);
111*a9643ea8Slogwang 	DIAG_COUNT_DEFINE(wr);
112*a9643ea8Slogwang 	DIAG_COUNT_DEFINE(size);
113*a9643ea8Slogwang 
114*a9643ea8Slogwang } __rte_cache_aligned;
115*a9643ea8Slogwang 
116*a9643ea8Slogwang 
117*a9643ea8Slogwang 
118*a9643ea8Slogwang static inline struct lthread_queue *
119*a9643ea8Slogwang _lthread_queue_create(const char *name)
120*a9643ea8Slogwang {
121*a9643ea8Slogwang 	struct qnode *stub;
122*a9643ea8Slogwang 	struct lthread_queue *new_queue;
123*a9643ea8Slogwang 
124*a9643ea8Slogwang 	new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
125*a9643ea8Slogwang 					RTE_CACHE_LINE_SIZE,
126*a9643ea8Slogwang 					rte_socket_id());
127*a9643ea8Slogwang 	if (new_queue == NULL)
128*a9643ea8Slogwang 		return NULL;
129*a9643ea8Slogwang 
130*a9643ea8Slogwang 	/* allocated stub node */
131*a9643ea8Slogwang 	stub = _qnode_alloc();
132*a9643ea8Slogwang 	RTE_ASSERT(stub);
133*a9643ea8Slogwang 
134*a9643ea8Slogwang 	if (name != NULL)
135*a9643ea8Slogwang 		strncpy(new_queue->name, name, sizeof(new_queue->name));
136*a9643ea8Slogwang 	new_queue->name[sizeof(new_queue->name)-1] = 0;
137*a9643ea8Slogwang 
138*a9643ea8Slogwang 	/* initialize queue as empty */
139*a9643ea8Slogwang 	stub->next = NULL;
140*a9643ea8Slogwang 	new_queue->head = stub;
141*a9643ea8Slogwang 	new_queue->tail = stub;
142*a9643ea8Slogwang 
143*a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, rd);
144*a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, wr);
145*a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, size);
146*a9643ea8Slogwang 
147*a9643ea8Slogwang 	return new_queue;
148*a9643ea8Slogwang }
149*a9643ea8Slogwang 
150*a9643ea8Slogwang /**
151*a9643ea8Slogwang  * Return true if the queue is empty
152*a9643ea8Slogwang  */
153*a9643ea8Slogwang static inline int __attribute__ ((always_inline))
154*a9643ea8Slogwang _lthread_queue_empty(struct lthread_queue *q)
155*a9643ea8Slogwang {
156*a9643ea8Slogwang 	return q->tail == q->head;
157*a9643ea8Slogwang }
158*a9643ea8Slogwang 
159*a9643ea8Slogwang 
160*a9643ea8Slogwang 
161*a9643ea8Slogwang /**
162*a9643ea8Slogwang  * Destroy a queue
163*a9643ea8Slogwang  * fail if queue is not empty
164*a9643ea8Slogwang  */
165*a9643ea8Slogwang static inline int _lthread_queue_destroy(struct lthread_queue *q)
166*a9643ea8Slogwang {
167*a9643ea8Slogwang 	if (q == NULL)
168*a9643ea8Slogwang 		return -1;
169*a9643ea8Slogwang 
170*a9643ea8Slogwang 	if (!_lthread_queue_empty(q))
171*a9643ea8Slogwang 		return -1;
172*a9643ea8Slogwang 
173*a9643ea8Slogwang 	_qnode_free(q->head);
174*a9643ea8Slogwang 	rte_free(q);
175*a9643ea8Slogwang 	return 0;
176*a9643ea8Slogwang }
177*a9643ea8Slogwang 
178*a9643ea8Slogwang RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
179*a9643ea8Slogwang 
180*a9643ea8Slogwang /*
181*a9643ea8Slogwang  * Insert a node into a queue
182*a9643ea8Slogwang  * this implementation is multi producer safe
183*a9643ea8Slogwang  */
184*a9643ea8Slogwang static inline struct qnode *__attribute__ ((always_inline))
185*a9643ea8Slogwang _lthread_queue_insert_mp(struct lthread_queue
186*a9643ea8Slogwang 							  *q, void *data)
187*a9643ea8Slogwang {
188*a9643ea8Slogwang 	struct qnode *prev;
189*a9643ea8Slogwang 	struct qnode *n = _qnode_alloc();
190*a9643ea8Slogwang 
191*a9643ea8Slogwang 	if (n == NULL)
192*a9643ea8Slogwang 		return NULL;
193*a9643ea8Slogwang 
194*a9643ea8Slogwang 	/* set object in node */
195*a9643ea8Slogwang 	n->data = data;
196*a9643ea8Slogwang 	n->next = NULL;
197*a9643ea8Slogwang 
198*a9643ea8Slogwang 	/* this is an MPSC method, perform a locked update */
199*a9643ea8Slogwang 	prev = n;
200*a9643ea8Slogwang 	prev =
201*a9643ea8Slogwang 	    (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
202*a9643ea8Slogwang 					       (uint64_t) prev);
203*a9643ea8Slogwang 	/* there is a window of inconsistency until prev next is set,
204*a9643ea8Slogwang 	 * which is why remove must retry
205*a9643ea8Slogwang 	 */
206*a9643ea8Slogwang 	prev->next = n;
207*a9643ea8Slogwang 
208*a9643ea8Slogwang 	DIAG_COUNT_INC(q, wr);
209*a9643ea8Slogwang 	DIAG_COUNT_INC(q, size);
210*a9643ea8Slogwang 
211*a9643ea8Slogwang 	return n;
212*a9643ea8Slogwang }
213*a9643ea8Slogwang 
214*a9643ea8Slogwang /*
215*a9643ea8Slogwang  * Insert an node into a queue in single producer mode
216*a9643ea8Slogwang  * this implementation is NOT mult producer safe
217*a9643ea8Slogwang  */
218*a9643ea8Slogwang static inline struct qnode *__attribute__ ((always_inline))
219*a9643ea8Slogwang _lthread_queue_insert_sp(struct lthread_queue
220*a9643ea8Slogwang 							  *q, void *data)
221*a9643ea8Slogwang {
222*a9643ea8Slogwang 	/* allocate a queue node */
223*a9643ea8Slogwang 	struct qnode *prev;
224*a9643ea8Slogwang 	struct qnode *n = _qnode_alloc();
225*a9643ea8Slogwang 
226*a9643ea8Slogwang 	if (n == NULL)
227*a9643ea8Slogwang 		return NULL;
228*a9643ea8Slogwang 
229*a9643ea8Slogwang 	/* set data in node */
230*a9643ea8Slogwang 	n->data = data;
231*a9643ea8Slogwang 	n->next = NULL;
232*a9643ea8Slogwang 
233*a9643ea8Slogwang 	/* this is an SPSC method, no need for locked exchange operation */
234*a9643ea8Slogwang 	prev = q->head;
235*a9643ea8Slogwang 	prev->next = q->head = n;
236*a9643ea8Slogwang 
237*a9643ea8Slogwang 	DIAG_COUNT_INC(q, wr);
238*a9643ea8Slogwang 	DIAG_COUNT_INC(q, size);
239*a9643ea8Slogwang 
240*a9643ea8Slogwang 	return n;
241*a9643ea8Slogwang }
242*a9643ea8Slogwang 
243*a9643ea8Slogwang /*
244*a9643ea8Slogwang  * Remove a node from a queue
245*a9643ea8Slogwang  */
246*a9643ea8Slogwang static inline void *__attribute__ ((always_inline))
247*a9643ea8Slogwang _lthread_queue_poll(struct lthread_queue *q)
248*a9643ea8Slogwang {
249*a9643ea8Slogwang 	void *data = NULL;
250*a9643ea8Slogwang 	struct qnode *tail = q->tail;
251*a9643ea8Slogwang 	struct qnode *next = (struct qnode *)tail->next;
252*a9643ea8Slogwang 	/*
253*a9643ea8Slogwang 	 * There is a small window of inconsistency between producer and
254*a9643ea8Slogwang 	 * consumer whereby the queue may appear empty if consumer and
255*a9643ea8Slogwang 	 * producer access it at the same time.
256*a9643ea8Slogwang 	 * The consumer must handle this by retrying
257*a9643ea8Slogwang 	 */
258*a9643ea8Slogwang 
259*a9643ea8Slogwang 	if (likely(next != NULL)) {
260*a9643ea8Slogwang 		q->tail = next;
261*a9643ea8Slogwang 		tail->data = next->data;
262*a9643ea8Slogwang 		data = tail->data;
263*a9643ea8Slogwang 
264*a9643ea8Slogwang 		/* free the node */
265*a9643ea8Slogwang 		_qnode_free(tail);
266*a9643ea8Slogwang 
267*a9643ea8Slogwang 		DIAG_COUNT_INC(q, rd);
268*a9643ea8Slogwang 		DIAG_COUNT_DEC(q, size);
269*a9643ea8Slogwang 		return data;
270*a9643ea8Slogwang 	}
271*a9643ea8Slogwang 	return NULL;
272*a9643ea8Slogwang }
273*a9643ea8Slogwang 
274*a9643ea8Slogwang /*
275*a9643ea8Slogwang  * Remove a node from a queue
276*a9643ea8Slogwang  */
277*a9643ea8Slogwang static inline void *__attribute__ ((always_inline))
278*a9643ea8Slogwang _lthread_queue_remove(struct lthread_queue *q)
279*a9643ea8Slogwang {
280*a9643ea8Slogwang 	void *data = NULL;
281*a9643ea8Slogwang 
282*a9643ea8Slogwang 	/*
283*a9643ea8Slogwang 	 * There is a small window of inconsistency between producer and
284*a9643ea8Slogwang 	 * consumer whereby the queue may appear empty if consumer and
285*a9643ea8Slogwang 	 * producer access it at the same time. We handle this by retrying
286*a9643ea8Slogwang 	 */
287*a9643ea8Slogwang 	do {
288*a9643ea8Slogwang 		data = _lthread_queue_poll(q);
289*a9643ea8Slogwang 
290*a9643ea8Slogwang 		if (likely(data != NULL)) {
291*a9643ea8Slogwang 
292*a9643ea8Slogwang 			DIAG_COUNT_INC(q, rd);
293*a9643ea8Slogwang 			DIAG_COUNT_DEC(q, size);
294*a9643ea8Slogwang 			return data;
295*a9643ea8Slogwang 		}
296*a9643ea8Slogwang 		rte_compiler_barrier();
297*a9643ea8Slogwang 	} while (unlikely(!_lthread_queue_empty(q)));
298*a9643ea8Slogwang 	return NULL;
299*a9643ea8Slogwang }
300*a9643ea8Slogwang 
301*a9643ea8Slogwang 
302*a9643ea8Slogwang #endif				/* LTHREAD_QUEUE_H_ */
303