1a9643ea8Slogwang /*
2*d30ea906Sjfb8856606  * SPDX-License-Identifier: BSD-3-Clause
3*d30ea906Sjfb8856606  * Copyright 2015 Intel Corporation.
4*d30ea906Sjfb8856606  * Copyright 2010-2011 Dmitry Vyukov
5a9643ea8Slogwang  */
6a9643ea8Slogwang 
7a9643ea8Slogwang #ifndef LTHREAD_QUEUE_H_
8a9643ea8Slogwang #define LTHREAD_QUEUE_H_
9a9643ea8Slogwang 
102bfe3f2eSlogwang #ifdef __cplusplus
112bfe3f2eSlogwang extern "C" {
122bfe3f2eSlogwang #endif
132bfe3f2eSlogwang 
14a9643ea8Slogwang #include <string.h>
15a9643ea8Slogwang 
16a9643ea8Slogwang #include <rte_prefetch.h>
17a9643ea8Slogwang #include <rte_per_lcore.h>
18a9643ea8Slogwang 
19a9643ea8Slogwang #include "lthread_int.h"
20a9643ea8Slogwang #include "lthread.h"
21a9643ea8Slogwang #include "lthread_diag.h"
22a9643ea8Slogwang #include "lthread_pool.h"
23a9643ea8Slogwang 
24a9643ea8Slogwang struct lthread_queue;
25a9643ea8Slogwang 
26a9643ea8Slogwang /*
27a9643ea8Slogwang  * This file implements an unbounded FIFO queue based on a lock free
28a9643ea8Slogwang  * linked list.
29a9643ea8Slogwang  *
30a9643ea8Slogwang  * The queue is non-intrusive in that it uses intermediate nodes, and does
31a9643ea8Slogwang  * not require these nodes to be inserted into the object being placed
32a9643ea8Slogwang  * in the queue.
33a9643ea8Slogwang  *
34a9643ea8Slogwang  * This is slightly more efficient than the very similar queue in lthread_pool
35a9643ea8Slogwang  * in that it does not have to swing a stub node as the queue becomes empty.
36a9643ea8Slogwang  *
37a9643ea8Slogwang  * The queue access functions allocate and free intermediate node
38a9643ea8Slogwang  * transparently from/to a per scheduler pool ( see lthread_pool.h ).
39a9643ea8Slogwang  *
40a9643ea8Slogwang  * The queue provides both MPSC and SPSC insert methods
41a9643ea8Slogwang  */
42a9643ea8Slogwang 
43a9643ea8Slogwang /*
44a9643ea8Slogwang  * define a queue of lthread nodes
45a9643ea8Slogwang  */
46a9643ea8Slogwang struct lthread_queue {
47a9643ea8Slogwang 	struct qnode *head;
48a9643ea8Slogwang 	struct qnode *tail __rte_cache_aligned;
49a9643ea8Slogwang 	struct lthread_queue *p;
50a9643ea8Slogwang 	char name[LT_MAX_NAME_SIZE];
51a9643ea8Slogwang 
52a9643ea8Slogwang 	DIAG_COUNT_DEFINE(rd);
53a9643ea8Slogwang 	DIAG_COUNT_DEFINE(wr);
54a9643ea8Slogwang 	DIAG_COUNT_DEFINE(size);
55a9643ea8Slogwang 
56a9643ea8Slogwang } __rte_cache_aligned;
57a9643ea8Slogwang 
58a9643ea8Slogwang 
59a9643ea8Slogwang 
60a9643ea8Slogwang static inline struct lthread_queue *
_lthread_queue_create(const char * name)61a9643ea8Slogwang _lthread_queue_create(const char *name)
62a9643ea8Slogwang {
63a9643ea8Slogwang 	struct qnode *stub;
64a9643ea8Slogwang 	struct lthread_queue *new_queue;
65a9643ea8Slogwang 
66a9643ea8Slogwang 	new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
67a9643ea8Slogwang 					RTE_CACHE_LINE_SIZE,
68a9643ea8Slogwang 					rte_socket_id());
69a9643ea8Slogwang 	if (new_queue == NULL)
70a9643ea8Slogwang 		return NULL;
71a9643ea8Slogwang 
72a9643ea8Slogwang 	/* allocated stub node */
73a9643ea8Slogwang 	stub = _qnode_alloc();
74a9643ea8Slogwang 	RTE_ASSERT(stub);
75a9643ea8Slogwang 
76a9643ea8Slogwang 	if (name != NULL)
77a9643ea8Slogwang 		strncpy(new_queue->name, name, sizeof(new_queue->name));
78a9643ea8Slogwang 	new_queue->name[sizeof(new_queue->name)-1] = 0;
79a9643ea8Slogwang 
80a9643ea8Slogwang 	/* initialize queue as empty */
81a9643ea8Slogwang 	stub->next = NULL;
82a9643ea8Slogwang 	new_queue->head = stub;
83a9643ea8Slogwang 	new_queue->tail = stub;
84a9643ea8Slogwang 
85a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, rd);
86a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, wr);
87a9643ea8Slogwang 	DIAG_COUNT_INIT(new_queue, size);
88a9643ea8Slogwang 
89a9643ea8Slogwang 	return new_queue;
90a9643ea8Slogwang }
91a9643ea8Slogwang 
92a9643ea8Slogwang /**
93a9643ea8Slogwang  * Return true if the queue is empty
94a9643ea8Slogwang  */
952bfe3f2eSlogwang static __rte_always_inline int
_lthread_queue_empty(struct lthread_queue * q)96a9643ea8Slogwang _lthread_queue_empty(struct lthread_queue *q)
97a9643ea8Slogwang {
98a9643ea8Slogwang 	return q->tail == q->head;
99a9643ea8Slogwang }
100a9643ea8Slogwang 
101a9643ea8Slogwang 
102a9643ea8Slogwang 
103a9643ea8Slogwang /**
104a9643ea8Slogwang  * Destroy a queue
105a9643ea8Slogwang  * fail if queue is not empty
106a9643ea8Slogwang  */
_lthread_queue_destroy(struct lthread_queue * q)107a9643ea8Slogwang static inline int _lthread_queue_destroy(struct lthread_queue *q)
108a9643ea8Slogwang {
109a9643ea8Slogwang 	if (q == NULL)
110a9643ea8Slogwang 		return -1;
111a9643ea8Slogwang 
112a9643ea8Slogwang 	if (!_lthread_queue_empty(q))
113a9643ea8Slogwang 		return -1;
114a9643ea8Slogwang 
115a9643ea8Slogwang 	_qnode_free(q->head);
116a9643ea8Slogwang 	rte_free(q);
117a9643ea8Slogwang 	return 0;
118a9643ea8Slogwang }
119a9643ea8Slogwang 
120a9643ea8Slogwang RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
121a9643ea8Slogwang 
122a9643ea8Slogwang /*
123a9643ea8Slogwang  * Insert a node into a queue
124a9643ea8Slogwang  * this implementation is multi producer safe
125a9643ea8Slogwang  */
1262bfe3f2eSlogwang static __rte_always_inline struct qnode *
_lthread_queue_insert_mp(struct lthread_queue * q,void * data)127a9643ea8Slogwang _lthread_queue_insert_mp(struct lthread_queue
128a9643ea8Slogwang 							  *q, void *data)
129a9643ea8Slogwang {
130a9643ea8Slogwang 	struct qnode *prev;
131a9643ea8Slogwang 	struct qnode *n = _qnode_alloc();
132a9643ea8Slogwang 
133a9643ea8Slogwang 	if (n == NULL)
134a9643ea8Slogwang 		return NULL;
135a9643ea8Slogwang 
136a9643ea8Slogwang 	/* set object in node */
137a9643ea8Slogwang 	n->data = data;
138a9643ea8Slogwang 	n->next = NULL;
139a9643ea8Slogwang 
140a9643ea8Slogwang 	/* this is an MPSC method, perform a locked update */
141a9643ea8Slogwang 	prev = n;
142a9643ea8Slogwang 	prev =
143a9643ea8Slogwang 	    (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
144a9643ea8Slogwang 					       (uint64_t) prev);
145a9643ea8Slogwang 	/* there is a window of inconsistency until prev next is set,
146a9643ea8Slogwang 	 * which is why remove must retry
147a9643ea8Slogwang 	 */
148a9643ea8Slogwang 	prev->next = n;
149a9643ea8Slogwang 
150a9643ea8Slogwang 	DIAG_COUNT_INC(q, wr);
151a9643ea8Slogwang 	DIAG_COUNT_INC(q, size);
152a9643ea8Slogwang 
153a9643ea8Slogwang 	return n;
154a9643ea8Slogwang }
155a9643ea8Slogwang 
156a9643ea8Slogwang /*
157a9643ea8Slogwang  * Insert an node into a queue in single producer mode
158a9643ea8Slogwang  * this implementation is NOT mult producer safe
159a9643ea8Slogwang  */
1602bfe3f2eSlogwang static __rte_always_inline struct qnode *
_lthread_queue_insert_sp(struct lthread_queue * q,void * data)161a9643ea8Slogwang _lthread_queue_insert_sp(struct lthread_queue
162a9643ea8Slogwang 							  *q, void *data)
163a9643ea8Slogwang {
164a9643ea8Slogwang 	/* allocate a queue node */
165a9643ea8Slogwang 	struct qnode *prev;
166a9643ea8Slogwang 	struct qnode *n = _qnode_alloc();
167a9643ea8Slogwang 
168a9643ea8Slogwang 	if (n == NULL)
169a9643ea8Slogwang 		return NULL;
170a9643ea8Slogwang 
171a9643ea8Slogwang 	/* set data in node */
172a9643ea8Slogwang 	n->data = data;
173a9643ea8Slogwang 	n->next = NULL;
174a9643ea8Slogwang 
175a9643ea8Slogwang 	/* this is an SPSC method, no need for locked exchange operation */
176a9643ea8Slogwang 	prev = q->head;
177a9643ea8Slogwang 	prev->next = q->head = n;
178a9643ea8Slogwang 
179a9643ea8Slogwang 	DIAG_COUNT_INC(q, wr);
180a9643ea8Slogwang 	DIAG_COUNT_INC(q, size);
181a9643ea8Slogwang 
182a9643ea8Slogwang 	return n;
183a9643ea8Slogwang }
184a9643ea8Slogwang 
185a9643ea8Slogwang /*
186a9643ea8Slogwang  * Remove a node from a queue
187a9643ea8Slogwang  */
1882bfe3f2eSlogwang static __rte_always_inline void *
_lthread_queue_poll(struct lthread_queue * q)189a9643ea8Slogwang _lthread_queue_poll(struct lthread_queue *q)
190a9643ea8Slogwang {
191a9643ea8Slogwang 	void *data = NULL;
192a9643ea8Slogwang 	struct qnode *tail = q->tail;
193a9643ea8Slogwang 	struct qnode *next = (struct qnode *)tail->next;
194a9643ea8Slogwang 	/*
195a9643ea8Slogwang 	 * There is a small window of inconsistency between producer and
196a9643ea8Slogwang 	 * consumer whereby the queue may appear empty if consumer and
197a9643ea8Slogwang 	 * producer access it at the same time.
198a9643ea8Slogwang 	 * The consumer must handle this by retrying
199a9643ea8Slogwang 	 */
200a9643ea8Slogwang 
201a9643ea8Slogwang 	if (likely(next != NULL)) {
202a9643ea8Slogwang 		q->tail = next;
203a9643ea8Slogwang 		tail->data = next->data;
204a9643ea8Slogwang 		data = tail->data;
205a9643ea8Slogwang 
206a9643ea8Slogwang 		/* free the node */
207a9643ea8Slogwang 		_qnode_free(tail);
208a9643ea8Slogwang 
209a9643ea8Slogwang 		DIAG_COUNT_INC(q, rd);
210a9643ea8Slogwang 		DIAG_COUNT_DEC(q, size);
211a9643ea8Slogwang 		return data;
212a9643ea8Slogwang 	}
213a9643ea8Slogwang 	return NULL;
214a9643ea8Slogwang }
215a9643ea8Slogwang 
216a9643ea8Slogwang /*
217a9643ea8Slogwang  * Remove a node from a queue
218a9643ea8Slogwang  */
2192bfe3f2eSlogwang static __rte_always_inline void *
_lthread_queue_remove(struct lthread_queue * q)220a9643ea8Slogwang _lthread_queue_remove(struct lthread_queue *q)
221a9643ea8Slogwang {
222a9643ea8Slogwang 	void *data = NULL;
223a9643ea8Slogwang 
224a9643ea8Slogwang 	/*
225a9643ea8Slogwang 	 * There is a small window of inconsistency between producer and
226a9643ea8Slogwang 	 * consumer whereby the queue may appear empty if consumer and
227a9643ea8Slogwang 	 * producer access it at the same time. We handle this by retrying
228a9643ea8Slogwang 	 */
229a9643ea8Slogwang 	do {
230a9643ea8Slogwang 		data = _lthread_queue_poll(q);
231a9643ea8Slogwang 
232a9643ea8Slogwang 		if (likely(data != NULL)) {
233a9643ea8Slogwang 
234a9643ea8Slogwang 			DIAG_COUNT_INC(q, rd);
235a9643ea8Slogwang 			DIAG_COUNT_DEC(q, size);
236a9643ea8Slogwang 			return data;
237a9643ea8Slogwang 		}
238a9643ea8Slogwang 		rte_compiler_barrier();
239a9643ea8Slogwang 	} while (unlikely(!_lthread_queue_empty(q)));
240a9643ea8Slogwang 	return NULL;
241a9643ea8Slogwang }
242a9643ea8Slogwang 
2432bfe3f2eSlogwang #ifdef __cplusplus
2442bfe3f2eSlogwang }
2452bfe3f2eSlogwang #endif
246a9643ea8Slogwang 
247a9643ea8Slogwang #endif				/* LTHREAD_QUEUE_H_ */
248