1a9643ea8Slogwang /*
2a9643ea8Slogwang  *-
3a9643ea8Slogwang  *   BSD LICENSE
4a9643ea8Slogwang  *
5a9643ea8Slogwang  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
6a9643ea8Slogwang  *   All rights reserved.
7a9643ea8Slogwang  *
8a9643ea8Slogwang  *   Redistribution and use in source and binary forms, with or without
9a9643ea8Slogwang  *   modification, are permitted provided that the following conditions
10a9643ea8Slogwang  *   are met:
11a9643ea8Slogwang  *
12a9643ea8Slogwang  *     * Redistributions of source code must retain the above copyright
13a9643ea8Slogwang  *       notice, this list of conditions and the following disclaimer.
14a9643ea8Slogwang  *     * Redistributions in binary form must reproduce the above copyright
15a9643ea8Slogwang  *       notice, this list of conditions and the following disclaimer in
16a9643ea8Slogwang  *       the documentation and/or other materials provided with the
17a9643ea8Slogwang  *       distribution.
18a9643ea8Slogwang  *     * Neither the name of Intel Corporation nor the names of its
19a9643ea8Slogwang  *       contributors may be used to endorse or promote products derived
20a9643ea8Slogwang  *       from this software without specific prior written permission.
21a9643ea8Slogwang  *
22a9643ea8Slogwang  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23a9643ea8Slogwang  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24a9643ea8Slogwang  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25a9643ea8Slogwang  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26a9643ea8Slogwang  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27a9643ea8Slogwang  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28a9643ea8Slogwang  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29a9643ea8Slogwang  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30a9643ea8Slogwang  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31a9643ea8Slogwang  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32a9643ea8Slogwang  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33a9643ea8Slogwang  */
34a9643ea8Slogwang 
35a9643ea8Slogwang /*
36a9643ea8Slogwang  * Some portions of this software is derived from the producer
37a9643ea8Slogwang  * consumer queues described by Dmitry Vyukov and published  here
38a9643ea8Slogwang  * http://www.1024cores.net
39a9643ea8Slogwang  *
40a9643ea8Slogwang  * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41a9643ea8Slogwang  * Redistribution and use in source and binary forms, with or without
42a9643ea8Slogwang  * modification, are permitted provided that the following conditions
43a9643ea8Slogwang  * are met:
44a9643ea8Slogwang  *
45a9643ea8Slogwang  * 1. Redistributions of source code must retain the above copyright notice,
46a9643ea8Slogwang  * this list of conditions and the following disclaimer.
47a9643ea8Slogwang  *
48a9643ea8Slogwang  * 2. Redistributions in binary form must reproduce the above copyright notice,
49a9643ea8Slogwang  * this list of conditions and the following disclaimer in the documentation
50a9643ea8Slogwang  * and/or other materials provided with the distribution.
51a9643ea8Slogwang  *
52a9643ea8Slogwang  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53a9643ea8Slogwang  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54a9643ea8Slogwang  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55a9643ea8Slogwang  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56a9643ea8Slogwang  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57a9643ea8Slogwang  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58a9643ea8Slogwang  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59a9643ea8Slogwang  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60a9643ea8Slogwang  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61a9643ea8Slogwang  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62a9643ea8Slogwang  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
63a9643ea8Slogwang  *
64a9643ea8Slogwang  * The views and conclusions contained in the software and documentation are
65a9643ea8Slogwang  * those of the authors and should not be interpreted as representing official
66a9643ea8Slogwang  * policies, either expressed or implied, of Dmitry Vyukov.
67a9643ea8Slogwang  */
68a9643ea8Slogwang 
69a9643ea8Slogwang #ifndef LTHREAD_QUEUE_H_
70a9643ea8Slogwang #define LTHREAD_QUEUE_H_
71a9643ea8Slogwang 
72*2bfe3f2eSlogwang #ifdef __cplusplus
73*2bfe3f2eSlogwang extern "C" {
74*2bfe3f2eSlogwang #endif
75*2bfe3f2eSlogwang 
76a9643ea8Slogwang #include <string.h>
77a9643ea8Slogwang 
78a9643ea8Slogwang #include <rte_prefetch.h>
79a9643ea8Slogwang #include <rte_per_lcore.h>
80a9643ea8Slogwang 
81a9643ea8Slogwang #include "lthread_int.h"
82a9643ea8Slogwang #include "lthread.h"
83a9643ea8Slogwang #include "lthread_diag.h"
84a9643ea8Slogwang #include "lthread_pool.h"
85a9643ea8Slogwang 
86a9643ea8Slogwang struct lthread_queue;
87a9643ea8Slogwang 
88a9643ea8Slogwang /*
89a9643ea8Slogwang  * This file implements an unbounded FIFO queue based on a lock free
90a9643ea8Slogwang  * linked list.
91a9643ea8Slogwang  *
92a9643ea8Slogwang  * The queue is non-intrusive in that it uses intermediate nodes, and does
93a9643ea8Slogwang  * not require these nodes to be inserted into the object being placed
94a9643ea8Slogwang  * in the queue.
95a9643ea8Slogwang  *
96a9643ea8Slogwang  * This is slightly more efficient than the very similar queue in lthread_pool
97a9643ea8Slogwang  * in that it does not have to swing a stub node as the queue becomes empty.
98a9643ea8Slogwang  *
99a9643ea8Slogwang  * The queue access functions allocate and free intermediate node
100a9643ea8Slogwang  * transparently from/to a per scheduler pool ( see lthread_pool.h ).
101a9643ea8Slogwang  *
102a9643ea8Slogwang  * The queue provides both MPSC and SPSC insert methods
103a9643ea8Slogwang  */
104a9643ea8Slogwang 
105a9643ea8Slogwang /*
106a9643ea8Slogwang  * define a queue of lthread nodes
107a9643ea8Slogwang  */
108a9643ea8Slogwang struct lthread_queue {
109a9643ea8Slogwang 	struct qnode *head;
110a9643ea8Slogwang 	struct qnode *tail __rte_cache_aligned;
111a9643ea8Slogwang 	struct lthread_queue *p;
112a9643ea8Slogwang 	char name[LT_MAX_NAME_SIZE];
113a9643ea8Slogwang 
114a9643ea8Slogwang 	DIAG_COUNT_DEFINE(rd);
115a9643ea8Slogwang 	DIAG_COUNT_DEFINE(wr);
116a9643ea8Slogwang 	DIAG_COUNT_DEFINE(size);
117a9643ea8Slogwang 
118a9643ea8Slogwang } __rte_cache_aligned;
119a9643ea8Slogwang 
120a9643ea8Slogwang 
121a9643ea8Slogwang 
122a9643ea8Slogwang static inline struct lthread_queue *
123a9643ea8Slogwang _lthread_queue_create(const char *name)
124a9643ea8Slogwang {
125a9643ea8Slogwang 	struct qnode *stub;
126a9643ea8Slogwang 	struct lthread_queue *new_queue;
127a9643ea8Slogwang 
128a9643ea8Slogwang 	new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
129a9643ea8Slogwang 					RTE_CACHE_LINE_SIZE,
130a9643ea8Slogwang 					rte_socket_id());
131a9643ea8Slogwang 	if (new_queue == NULL)
132a9643ea8Slogwang 		return NULL;
133a9643ea8Slogwang 
134a9643ea8Slogwang 	/* allocated stub node */
135a9643ea8Slogwang 	stub = _qnode_alloc();
136a9643ea8Slogwang 	RTE_ASSERT(stub);
137a9643ea8Slogwang 
138a9643ea8Slogwang 	if (name != NULL)
139a9643ea8Slogwang 		strncpy(new_queue->name, name, sizeof(new_queue->name));
140a9643ea8Slogwang 	new_queue->name[sizeof(new_queue->name)-1] = 0;
141a9643ea8Slogwang 
142a9643ea8Slogwang 	/* initialize queue as empty */
143a9643ea8Slogwang 	stub->next = NULL;
144a9643ea8Slogwang 	new_queue->head = stub;
145a9643ea8Slogwang 	new_queue->tail = stub;
146a9643ea8Slogwang 
147a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, rd);
148a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, wr);
149a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, size);
150a9643ea8Slogwang 
151a9643ea8Slogwang 	return new_queue;
152a9643ea8Slogwang }
153a9643ea8Slogwang 
154a9643ea8Slogwang /**
155a9643ea8Slogwang  * Return true if the queue is empty
156a9643ea8Slogwang  */
157*2bfe3f2eSlogwang static __rte_always_inline int
158a9643ea8Slogwang _lthread_queue_empty(struct lthread_queue *q)
159a9643ea8Slogwang {
160a9643ea8Slogwang 	return q->tail == q->head;
161a9643ea8Slogwang }
162a9643ea8Slogwang 
163a9643ea8Slogwang 
164a9643ea8Slogwang 
165a9643ea8Slogwang /**
166a9643ea8Slogwang  * Destroy a queue
167a9643ea8Slogwang  * fail if queue is not empty
168a9643ea8Slogwang  */
169a9643ea8Slogwang static inline int _lthread_queue_destroy(struct lthread_queue *q)
170a9643ea8Slogwang {
171a9643ea8Slogwang 	if (q == NULL)
172a9643ea8Slogwang 		return -1;
173a9643ea8Slogwang 
174a9643ea8Slogwang 	if (!_lthread_queue_empty(q))
175a9643ea8Slogwang 		return -1;
176a9643ea8Slogwang 
177a9643ea8Slogwang 	_qnode_free(q->head);
178a9643ea8Slogwang 	rte_free(q);
179a9643ea8Slogwang 	return 0;
180a9643ea8Slogwang }
181a9643ea8Slogwang 
182a9643ea8Slogwang RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
183a9643ea8Slogwang 
184a9643ea8Slogwang /*
185a9643ea8Slogwang  * Insert a node into a queue
186a9643ea8Slogwang  * this implementation is multi producer safe
187a9643ea8Slogwang  */
188*2bfe3f2eSlogwang static __rte_always_inline struct qnode *
189a9643ea8Slogwang _lthread_queue_insert_mp(struct lthread_queue
190a9643ea8Slogwang 							  *q, void *data)
191a9643ea8Slogwang {
192a9643ea8Slogwang 	struct qnode *prev;
193a9643ea8Slogwang 	struct qnode *n = _qnode_alloc();
194a9643ea8Slogwang 
195a9643ea8Slogwang 	if (n == NULL)
196a9643ea8Slogwang 		return NULL;
197a9643ea8Slogwang 
198a9643ea8Slogwang 	/* set object in node */
199a9643ea8Slogwang 	n->data = data;
200a9643ea8Slogwang 	n->next = NULL;
201a9643ea8Slogwang 
202a9643ea8Slogwang 	/* this is an MPSC method, perform a locked update */
203a9643ea8Slogwang 	prev = n;
204a9643ea8Slogwang 	prev =
205a9643ea8Slogwang 	    (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
206a9643ea8Slogwang 					       (uint64_t) prev);
207a9643ea8Slogwang 	/* there is a window of inconsistency until prev next is set,
208a9643ea8Slogwang 	 * which is why remove must retry
209a9643ea8Slogwang 	 */
210a9643ea8Slogwang 	prev->next = n;
211a9643ea8Slogwang 
212a9643ea8Slogwang 	DIAG_COUNT_INC(q, wr);
213a9643ea8Slogwang 	DIAG_COUNT_INC(q, size);
214a9643ea8Slogwang 
215a9643ea8Slogwang 	return n;
216a9643ea8Slogwang }
217a9643ea8Slogwang 
218a9643ea8Slogwang /*
219a9643ea8Slogwang  * Insert an node into a queue in single producer mode
220a9643ea8Slogwang  * this implementation is NOT mult producer safe
221a9643ea8Slogwang  */
222*2bfe3f2eSlogwang static __rte_always_inline struct qnode *
223a9643ea8Slogwang _lthread_queue_insert_sp(struct lthread_queue
224a9643ea8Slogwang 							  *q, void *data)
225a9643ea8Slogwang {
226a9643ea8Slogwang 	/* allocate a queue node */
227a9643ea8Slogwang 	struct qnode *prev;
228a9643ea8Slogwang 	struct qnode *n = _qnode_alloc();
229a9643ea8Slogwang 
230a9643ea8Slogwang 	if (n == NULL)
231a9643ea8Slogwang 		return NULL;
232a9643ea8Slogwang 
233a9643ea8Slogwang 	/* set data in node */
234a9643ea8Slogwang 	n->data = data;
235a9643ea8Slogwang 	n->next = NULL;
236a9643ea8Slogwang 
237a9643ea8Slogwang 	/* this is an SPSC method, no need for locked exchange operation */
238a9643ea8Slogwang 	prev = q->head;
239a9643ea8Slogwang 	prev->next = q->head = n;
240a9643ea8Slogwang 
241a9643ea8Slogwang 	DIAG_COUNT_INC(q, wr);
242a9643ea8Slogwang 	DIAG_COUNT_INC(q, size);
243a9643ea8Slogwang 
244a9643ea8Slogwang 	return n;
245a9643ea8Slogwang }
246a9643ea8Slogwang 
247a9643ea8Slogwang /*
248a9643ea8Slogwang  * Remove a node from a queue
249a9643ea8Slogwang  */
250*2bfe3f2eSlogwang static __rte_always_inline void *
251a9643ea8Slogwang _lthread_queue_poll(struct lthread_queue *q)
252a9643ea8Slogwang {
253a9643ea8Slogwang 	void *data = NULL;
254a9643ea8Slogwang 	struct qnode *tail = q->tail;
255a9643ea8Slogwang 	struct qnode *next = (struct qnode *)tail->next;
256a9643ea8Slogwang 	/*
257a9643ea8Slogwang 	 * There is a small window of inconsistency between producer and
258a9643ea8Slogwang 	 * consumer whereby the queue may appear empty if consumer and
259a9643ea8Slogwang 	 * producer access it at the same time.
260a9643ea8Slogwang 	 * The consumer must handle this by retrying
261a9643ea8Slogwang 	 */
262a9643ea8Slogwang 
263a9643ea8Slogwang 	if (likely(next != NULL)) {
264a9643ea8Slogwang 		q->tail = next;
265a9643ea8Slogwang 		tail->data = next->data;
266a9643ea8Slogwang 		data = tail->data;
267a9643ea8Slogwang 
268a9643ea8Slogwang 		/* free the node */
269a9643ea8Slogwang 		_qnode_free(tail);
270a9643ea8Slogwang 
271a9643ea8Slogwang 		DIAG_COUNT_INC(q, rd);
272a9643ea8Slogwang 		DIAG_COUNT_DEC(q, size);
273a9643ea8Slogwang 		return data;
274a9643ea8Slogwang 	}
275a9643ea8Slogwang 	return NULL;
276a9643ea8Slogwang }
277a9643ea8Slogwang 
278a9643ea8Slogwang /*
279a9643ea8Slogwang  * Remove a node from a queue
280a9643ea8Slogwang  */
281*2bfe3f2eSlogwang static __rte_always_inline void *
282a9643ea8Slogwang _lthread_queue_remove(struct lthread_queue *q)
283a9643ea8Slogwang {
284a9643ea8Slogwang 	void *data = NULL;
285a9643ea8Slogwang 
286a9643ea8Slogwang 	/*
287a9643ea8Slogwang 	 * There is a small window of inconsistency between producer and
288a9643ea8Slogwang 	 * consumer whereby the queue may appear empty if consumer and
289a9643ea8Slogwang 	 * producer access it at the same time. We handle this by retrying
290a9643ea8Slogwang 	 */
291a9643ea8Slogwang 	do {
292a9643ea8Slogwang 		data = _lthread_queue_poll(q);
293a9643ea8Slogwang 
294a9643ea8Slogwang 		if (likely(data != NULL)) {
295a9643ea8Slogwang 
296a9643ea8Slogwang 			DIAG_COUNT_INC(q, rd);
297a9643ea8Slogwang 			DIAG_COUNT_DEC(q, size);
298a9643ea8Slogwang 			return data;
299a9643ea8Slogwang 		}
300a9643ea8Slogwang 		rte_compiler_barrier();
301a9643ea8Slogwang 	} while (unlikely(!_lthread_queue_empty(q)));
302a9643ea8Slogwang 	return NULL;
303a9643ea8Slogwang }
304a9643ea8Slogwang 
305*2bfe3f2eSlogwang #ifdef __cplusplus
306*2bfe3f2eSlogwang }
307*2bfe3f2eSlogwang #endif
308a9643ea8Slogwang 
309a9643ea8Slogwang #endif				/* LTHREAD_QUEUE_H_ */
310