1 /*
2  *-
3  *   BSD LICENSE
4  *
5  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 /*
36  * Some portions of this software is derived from the producer
37  * consumer queues described by Dmitry Vyukov and published  here
38  * http://www.1024cores.net
39  *
40  * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41  * Redistribution and use in source and binary forms, with or without
42  * modification, are permitted provided that the following conditions
43  * are met
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  * this list of conditions and the following disclaimer.
47  *
48  * 2. Redistributions in binary form must reproduce the above copyright notice,
49  * this list of conditions and the following disclaimer in the documentation
50  * and/or other materials provided with the distribution.
51  *
52  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
63  *
64  * The views and conclusions contained in the software and documentation are
65  * those of the authors and should not be interpreted as representing official
66  * policies, either expressed or implied, of Dmitry Vyukov.
67  */
68 
69 #ifndef LTHREAD_POOL_H_
70 #define LTHREAD_POOL_H_
71 
72 #include <rte_malloc.h>
73 #include <rte_per_lcore.h>
74 #include <rte_log.h>
75 
76 #include "lthread_int.h"
77 #include "lthread_diag.h"
78 
79 /*
80  * This file implements pool of queue nodes used by the queue implemented
81  * in lthread_queue.h.
82  *
83  * The pool is an intrusive lock free MPSC queue.
84  *
85  * The pool is created empty and populated lazily, i.e. on first attempt to
86  * allocate a the pool.
87  *
88  * Whenever the pool is empty more nodes are added to the pool
89  * The number of nodes preallocated in this way is a parameter of
90  * _qnode_pool_create. Freeing an object returns it to the pool.
91  *
92  * Each lthread scheduler maintains its own pool of nodes. L-threads must always
93  * allocate from this local pool ( because it is a single consumer queue ).
94  * L-threads can free nodes to any pool (because it is a multi producer queue)
95  * This enables threads that have affined to a different scheduler to free
96  * nodes safely.
97  */
98 
99 struct qnode;
100 struct qnode_cache;
101 
102 /*
103  * define intermediate node
104  */
105 struct qnode {
106 	struct qnode *next;
107 	void *data;
108 	struct qnode_pool *pool;
109 } __rte_cache_aligned;
110 
111 /*
112  * a pool structure
113  */
114 struct qnode_pool {
115 	struct qnode *head;
116 	struct qnode *stub;
117 	struct qnode *fast_alloc;
118 	struct qnode *tail __rte_cache_aligned;
119 	int pre_alloc;
120 	char name[LT_MAX_NAME_SIZE];
121 
122 	DIAG_COUNT_DEFINE(rd);
123 	DIAG_COUNT_DEFINE(wr);
124 	DIAG_COUNT_DEFINE(available);
125 	DIAG_COUNT_DEFINE(prealloc);
126 	DIAG_COUNT_DEFINE(capacity);
127 } __rte_cache_aligned;
128 
129 /*
130  * Create a pool of qnodes
131  */
132 
133 static inline struct qnode_pool *
134 _qnode_pool_create(const char *name, int prealloc_size) {
135 
136 	struct qnode_pool *p = rte_malloc_socket(NULL,
137 					sizeof(struct qnode_pool),
138 					RTE_CACHE_LINE_SIZE,
139 					rte_socket_id());
140 
141 	RTE_ASSERT(p);
142 
143 	p->stub = rte_malloc_socket(NULL,
144 				sizeof(struct qnode),
145 				RTE_CACHE_LINE_SIZE,
146 				rte_socket_id());
147 
148 	RTE_ASSERT(p->stub);
149 
150 	if (name != NULL)
151 		strncpy(p->name, name, LT_MAX_NAME_SIZE);
152 	p->name[sizeof(p->name)-1] = 0;
153 
154 	p->stub->pool = p;
155 	p->stub->next = NULL;
156 	p->tail = p->stub;
157 	p->head = p->stub;
158 	p->pre_alloc = prealloc_size;
159 
160 	DIAG_COUNT_INIT(p, rd);
161 	DIAG_COUNT_INIT(p, wr);
162 	DIAG_COUNT_INIT(p, available);
163 	DIAG_COUNT_INIT(p, prealloc);
164 	DIAG_COUNT_INIT(p, capacity);
165 
166 	return p;
167 }
168 
169 
170 /*
171  * Insert a node into the pool
172  */
173 static inline void __attribute__ ((always_inline))
174 _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
175 {
176 	n->next = NULL;
177 	struct qnode *prev = n;
178 	/* We insert at the head */
179 	prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
180 						(uint64_t) prev);
181 	/* there is a window of inconsistency until prev next is set */
182 	/* which is why remove must retry */
183 	prev->next = (n);
184 }
185 
186 /*
187  * Remove a node from the pool
188  *
189  * There is a race with _qnode_pool_insert() whereby the queue could appear
190  * empty during a concurrent insert, this is handled by retrying
191  *
192  * The queue uses a stub node, which must be swung as the queue becomes
193  * empty, this requires an insert of the stub, which means that removing the
194  * last item from the queue incurs the penalty of an atomic exchange. Since the
195  * pool is maintained with a bulk pre-allocation the cost of this is amortised.
196  */
197 static inline struct qnode *__attribute__ ((always_inline))
198 _pool_remove(struct qnode_pool *p)
199 {
200 	struct qnode *head;
201 	struct qnode *tail = p->tail;
202 	struct qnode *next = tail->next;
203 
204 	/* we remove from the tail */
205 	if (tail == p->stub) {
206 		if (next == NULL)
207 			return NULL;
208 		/* advance the tail */
209 		p->tail = next;
210 		tail = next;
211 		next = next->next;
212 	}
213 	if (likely(next != NULL)) {
214 		p->tail = next;
215 		return tail;
216 	}
217 
218 	head = p->head;
219 	if (tail == head)
220 		return NULL;
221 
222 	/* swing stub node */
223 	_qnode_pool_insert(p, p->stub);
224 
225 	next = tail->next;
226 	if (next) {
227 		p->tail = next;
228 		return tail;
229 	}
230 	return NULL;
231 }
232 
233 
234 /*
235  * This adds a retry to the _pool_remove function
236  * defined above
237  */
238 static inline struct qnode *__attribute__ ((always_inline))
239 _qnode_pool_remove(struct qnode_pool *p)
240 {
241 	struct qnode *n;
242 
243 	do {
244 		n = _pool_remove(p);
245 		if (likely(n != NULL))
246 			return n;
247 
248 		rte_compiler_barrier();
249 	}  while ((p->head != p->tail) &&
250 			(p->tail != p->stub));
251 	return NULL;
252 }
253 
254 /*
255  * Allocate a node from the pool
256  * If the pool is empty add mode nodes
257  */
258 static inline struct qnode *__attribute__ ((always_inline))
259 _qnode_alloc(void)
260 {
261 	struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
262 	int prealloc_size = p->pre_alloc;
263 	struct qnode *n;
264 	int i;
265 
266 	if (likely(p->fast_alloc != NULL)) {
267 		n = p->fast_alloc;
268 		p->fast_alloc = NULL;
269 		return n;
270 	}
271 
272 	n = _qnode_pool_remove(p);
273 
274 	if (unlikely(n == NULL)) {
275 		DIAG_COUNT_INC(p, prealloc);
276 		for (i = 0; i < prealloc_size; i++) {
277 			n = rte_malloc_socket(NULL,
278 					sizeof(struct qnode),
279 					RTE_CACHE_LINE_SIZE,
280 					rte_socket_id());
281 			if (n == NULL)
282 				return NULL;
283 
284 			DIAG_COUNT_INC(p, available);
285 			DIAG_COUNT_INC(p, capacity);
286 
287 			n->pool = p;
288 			_qnode_pool_insert(p, n);
289 		}
290 		n = _qnode_pool_remove(p);
291 	}
292 	n->pool = p;
293 	DIAG_COUNT_INC(p, rd);
294 	DIAG_COUNT_DEC(p, available);
295 	return n;
296 }
297 
298 
299 
300 /*
301 * free a queue node to the per scheduler pool from which it came
302 */
303 static inline void __attribute__ ((always_inline))
304 _qnode_free(struct qnode *n)
305 {
306 	struct qnode_pool *p = n->pool;
307 
308 
309 	if (unlikely(p->fast_alloc != NULL) ||
310 			unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
311 		DIAG_COUNT_INC(p, wr);
312 		DIAG_COUNT_INC(p, available);
313 		_qnode_pool_insert(p, n);
314 		return;
315 	}
316 	p->fast_alloc = n;
317 }
318 
319 /*
320  * Destroy an qnode pool
321  * queue must be empty when this is called
322  */
323 static inline int
324 _qnode_pool_destroy(struct qnode_pool *p)
325 {
326 	rte_free(p->stub);
327 	rte_free(p);
328 	return 0;
329 }
330 
331 
332 #endif				/* LTHREAD_POOL_H_ */
333