1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2015 Intel Corporation.
4  * Copyright 2010-2011 Dmitry Vyukov
5  */
6 
7 #ifndef LTHREAD_QUEUE_H_
8 #define LTHREAD_QUEUE_H_
9 
10 #ifdef __cplusplus
11 extern "C" {
12 #endif
13 
14 #include <string.h>
15 
16 #include <rte_prefetch.h>
17 #include <rte_per_lcore.h>
18 
19 #include "lthread_int.h"
20 #include "lthread.h"
21 #include "lthread_diag.h"
22 #include "lthread_pool.h"
23 
24 struct lthread_queue;
25 
26 /*
27  * This file implements an unbounded FIFO queue based on a lock free
28  * linked list.
29  *
30  * The queue is non-intrusive in that it uses intermediate nodes, and does
31  * not require these nodes to be inserted into the object being placed
32  * in the queue.
33  *
34  * This is slightly more efficient than the very similar queue in lthread_pool
35  * in that it does not have to swing a stub node as the queue becomes empty.
36  *
37  * The queue access functions allocate and free intermediate node
38  * transparently from/to a per scheduler pool ( see lthread_pool.h ).
39  *
40  * The queue provides both MPSC and SPSC insert methods
41  */
42 
43 /*
44  * define a queue of lthread nodes
45  */
46 struct lthread_queue {
47 	struct qnode *head;
48 	struct qnode *tail __rte_cache_aligned;
49 	struct lthread_queue *p;
50 	char name[LT_MAX_NAME_SIZE];
51 
52 	DIAG_COUNT_DEFINE(rd);
53 	DIAG_COUNT_DEFINE(wr);
54 	DIAG_COUNT_DEFINE(size);
55 
56 } __rte_cache_aligned;
57 
58 
59 
60 static inline struct lthread_queue *
_lthread_queue_create(const char * name)61 _lthread_queue_create(const char *name)
62 {
63 	struct qnode *stub;
64 	struct lthread_queue *new_queue;
65 
66 	new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
67 					RTE_CACHE_LINE_SIZE,
68 					rte_socket_id());
69 	if (new_queue == NULL)
70 		return NULL;
71 
72 	/* allocated stub node */
73 	stub = _qnode_alloc();
74 	RTE_ASSERT(stub);
75 
76 	if (name != NULL)
77 		strncpy(new_queue->name, name, sizeof(new_queue->name));
78 	new_queue->name[sizeof(new_queue->name)-1] = 0;
79 
80 	/* initialize queue as empty */
81 	stub->next = NULL;
82 	new_queue->head = stub;
83 	new_queue->tail = stub;
84 
85 	DIAG_COUNT_INIT(new_queue, rd);
86 	DIAG_COUNT_INIT(new_queue, wr);
87 	DIAG_COUNT_INIT(new_queue, size);
88 
89 	return new_queue;
90 }
91 
92 /**
93  * Return true if the queue is empty
94  */
95 static __rte_always_inline int
_lthread_queue_empty(struct lthread_queue * q)96 _lthread_queue_empty(struct lthread_queue *q)
97 {
98 	return q->tail == q->head;
99 }
100 
101 
102 
103 /**
104  * Destroy a queue
105  * fail if queue is not empty
106  */
_lthread_queue_destroy(struct lthread_queue * q)107 static inline int _lthread_queue_destroy(struct lthread_queue *q)
108 {
109 	if (q == NULL)
110 		return -1;
111 
112 	if (!_lthread_queue_empty(q))
113 		return -1;
114 
115 	_qnode_free(q->head);
116 	rte_free(q);
117 	return 0;
118 }
119 
120 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
121 
122 /*
123  * Insert a node into a queue
124  * this implementation is multi producer safe
125  */
126 static __rte_always_inline struct qnode *
_lthread_queue_insert_mp(struct lthread_queue * q,void * data)127 _lthread_queue_insert_mp(struct lthread_queue
128 							  *q, void *data)
129 {
130 	struct qnode *prev;
131 	struct qnode *n = _qnode_alloc();
132 
133 	if (n == NULL)
134 		return NULL;
135 
136 	/* set object in node */
137 	n->data = data;
138 	n->next = NULL;
139 
140 	/* this is an MPSC method, perform a locked update */
141 	prev = n;
142 	prev =
143 	    (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
144 					       (uint64_t) prev);
145 	/* there is a window of inconsistency until prev next is set,
146 	 * which is why remove must retry
147 	 */
148 	prev->next = n;
149 
150 	DIAG_COUNT_INC(q, wr);
151 	DIAG_COUNT_INC(q, size);
152 
153 	return n;
154 }
155 
156 /*
157  * Insert an node into a queue in single producer mode
158  * this implementation is NOT mult producer safe
159  */
160 static __rte_always_inline struct qnode *
_lthread_queue_insert_sp(struct lthread_queue * q,void * data)161 _lthread_queue_insert_sp(struct lthread_queue
162 							  *q, void *data)
163 {
164 	/* allocate a queue node */
165 	struct qnode *prev;
166 	struct qnode *n = _qnode_alloc();
167 
168 	if (n == NULL)
169 		return NULL;
170 
171 	/* set data in node */
172 	n->data = data;
173 	n->next = NULL;
174 
175 	/* this is an SPSC method, no need for locked exchange operation */
176 	prev = q->head;
177 	prev->next = q->head = n;
178 
179 	DIAG_COUNT_INC(q, wr);
180 	DIAG_COUNT_INC(q, size);
181 
182 	return n;
183 }
184 
185 /*
186  * Remove a node from a queue
187  */
188 static __rte_always_inline void *
_lthread_queue_poll(struct lthread_queue * q)189 _lthread_queue_poll(struct lthread_queue *q)
190 {
191 	void *data = NULL;
192 	struct qnode *tail = q->tail;
193 	struct qnode *next = (struct qnode *)tail->next;
194 	/*
195 	 * There is a small window of inconsistency between producer and
196 	 * consumer whereby the queue may appear empty if consumer and
197 	 * producer access it at the same time.
198 	 * The consumer must handle this by retrying
199 	 */
200 
201 	if (likely(next != NULL)) {
202 		q->tail = next;
203 		tail->data = next->data;
204 		data = tail->data;
205 
206 		/* free the node */
207 		_qnode_free(tail);
208 
209 		DIAG_COUNT_INC(q, rd);
210 		DIAG_COUNT_DEC(q, size);
211 		return data;
212 	}
213 	return NULL;
214 }
215 
216 /*
217  * Remove a node from a queue
218  */
219 static __rte_always_inline void *
_lthread_queue_remove(struct lthread_queue * q)220 _lthread_queue_remove(struct lthread_queue *q)
221 {
222 	void *data = NULL;
223 
224 	/*
225 	 * There is a small window of inconsistency between producer and
226 	 * consumer whereby the queue may appear empty if consumer and
227 	 * producer access it at the same time. We handle this by retrying
228 	 */
229 	do {
230 		data = _lthread_queue_poll(q);
231 
232 		if (likely(data != NULL)) {
233 
234 			DIAG_COUNT_INC(q, rd);
235 			DIAG_COUNT_DEC(q, size);
236 			return data;
237 		}
238 		rte_compiler_barrier();
239 	} while (unlikely(!_lthread_queue_empty(q)));
240 	return NULL;
241 }
242 
243 #ifdef __cplusplus
244 }
245 #endif
246 
247 #endif				/* LTHREAD_QUEUE_H_ */
248