1 /*
2  *-
3  *   BSD LICENSE
4  *
5  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34 
35 /*
36  * Some portions of this software is derived from the producer
37  * consumer queues described by Dmitry Vyukov and published  here
38  * http://www.1024cores.net
39  *
40  * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41  * Redistribution and use in source and binary forms, with or without
42  * modification, are permitted provided that the following conditions
43  * are met
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  * this list of conditions and the following disclaimer.
47  *
48  * 2. Redistributions in binary form must reproduce the above copyright notice,
49  * this list of conditions and the following disclaimer in the documentation
50  * and/or other materials provided with the distribution.
51  *
52  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
63  *
64  * The views and conclusions contained in the software and documentation are
65  * those of the authors and should not be interpreted as representing official
66  * policies, either expressed or implied, of Dmitry Vyukov.
67  */
68 
69 #ifndef LTHREAD_POOL_H_
70 #define LTHREAD_POOL_H_
71 
72 #ifdef __cplusplus
73 extern "C" {
74 #endif
75 
76 #include <rte_malloc.h>
77 #include <rte_per_lcore.h>
78 #include <rte_log.h>
79 
80 #include "lthread_int.h"
81 #include "lthread_diag.h"
82 
83 /*
84  * This file implements pool of queue nodes used by the queue implemented
85  * in lthread_queue.h.
86  *
87  * The pool is an intrusive lock free MPSC queue.
88  *
89  * The pool is created empty and populated lazily, i.e. on first attempt to
90  * allocate a the pool.
91  *
92  * Whenever the pool is empty more nodes are added to the pool
93  * The number of nodes preallocated in this way is a parameter of
94  * _qnode_pool_create. Freeing an object returns it to the pool.
95  *
96  * Each lthread scheduler maintains its own pool of nodes. L-threads must always
97  * allocate from this local pool ( because it is a single consumer queue ).
98  * L-threads can free nodes to any pool (because it is a multi producer queue)
99  * This enables threads that have affined to a different scheduler to free
100  * nodes safely.
101  */
102 
103 struct qnode;
104 struct qnode_cache;
105 
106 /*
107  * define intermediate node
108  */
109 struct qnode {
110 	struct qnode *next;
111 	void *data;
112 	struct qnode_pool *pool;
113 } __rte_cache_aligned;
114 
115 /*
116  * a pool structure
117  */
118 struct qnode_pool {
119 	struct qnode *head;
120 	struct qnode *stub;
121 	struct qnode *fast_alloc;
122 	struct qnode *tail __rte_cache_aligned;
123 	int pre_alloc;
124 	char name[LT_MAX_NAME_SIZE];
125 
126 	DIAG_COUNT_DEFINE(rd);
127 	DIAG_COUNT_DEFINE(wr);
128 	DIAG_COUNT_DEFINE(available);
129 	DIAG_COUNT_DEFINE(prealloc);
130 	DIAG_COUNT_DEFINE(capacity);
131 } __rte_cache_aligned;
132 
133 /*
134  * Create a pool of qnodes
135  */
136 
137 static inline struct qnode_pool *
138 _qnode_pool_create(const char *name, int prealloc_size) {
139 
140 	struct qnode_pool *p = rte_malloc_socket(NULL,
141 					sizeof(struct qnode_pool),
142 					RTE_CACHE_LINE_SIZE,
143 					rte_socket_id());
144 
145 	RTE_ASSERT(p);
146 
147 	p->stub = rte_malloc_socket(NULL,
148 				sizeof(struct qnode),
149 				RTE_CACHE_LINE_SIZE,
150 				rte_socket_id());
151 
152 	RTE_ASSERT(p->stub);
153 
154 	if (name != NULL)
155 		strncpy(p->name, name, LT_MAX_NAME_SIZE);
156 	p->name[sizeof(p->name)-1] = 0;
157 
158 	p->stub->pool = p;
159 	p->stub->next = NULL;
160 	p->tail = p->stub;
161 	p->head = p->stub;
162 	p->pre_alloc = prealloc_size;
163 
164 	DIAG_COUNT_INIT(p, rd);
165 	DIAG_COUNT_INIT(p, wr);
166 	DIAG_COUNT_INIT(p, available);
167 	DIAG_COUNT_INIT(p, prealloc);
168 	DIAG_COUNT_INIT(p, capacity);
169 
170 	return p;
171 }
172 
173 
174 /*
175  * Insert a node into the pool
176  */
177 static __rte_always_inline void
178 _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
179 {
180 	n->next = NULL;
181 	struct qnode *prev = n;
182 	/* We insert at the head */
183 	prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
184 						(uint64_t) prev);
185 	/* there is a window of inconsistency until prev next is set */
186 	/* which is why remove must retry */
187 	prev->next = (n);
188 }
189 
190 /*
191  * Remove a node from the pool
192  *
193  * There is a race with _qnode_pool_insert() whereby the queue could appear
194  * empty during a concurrent insert, this is handled by retrying
195  *
196  * The queue uses a stub node, which must be swung as the queue becomes
197  * empty, this requires an insert of the stub, which means that removing the
198  * last item from the queue incurs the penalty of an atomic exchange. Since the
199  * pool is maintained with a bulk pre-allocation the cost of this is amortised.
200  */
201 static __rte_always_inline struct qnode *
202 _pool_remove(struct qnode_pool *p)
203 {
204 	struct qnode *head;
205 	struct qnode *tail = p->tail;
206 	struct qnode *next = tail->next;
207 
208 	/* we remove from the tail */
209 	if (tail == p->stub) {
210 		if (next == NULL)
211 			return NULL;
212 		/* advance the tail */
213 		p->tail = next;
214 		tail = next;
215 		next = next->next;
216 	}
217 	if (likely(next != NULL)) {
218 		p->tail = next;
219 		return tail;
220 	}
221 
222 	head = p->head;
223 	if (tail == head)
224 		return NULL;
225 
226 	/* swing stub node */
227 	_qnode_pool_insert(p, p->stub);
228 
229 	next = tail->next;
230 	if (next) {
231 		p->tail = next;
232 		return tail;
233 	}
234 	return NULL;
235 }
236 
237 
238 /*
239  * This adds a retry to the _pool_remove function
240  * defined above
241  */
242 static __rte_always_inline struct qnode *
243 _qnode_pool_remove(struct qnode_pool *p)
244 {
245 	struct qnode *n;
246 
247 	do {
248 		n = _pool_remove(p);
249 		if (likely(n != NULL))
250 			return n;
251 
252 		rte_compiler_barrier();
253 	}  while ((p->head != p->tail) &&
254 			(p->tail != p->stub));
255 	return NULL;
256 }
257 
258 /*
259  * Allocate a node from the pool
260  * If the pool is empty add mode nodes
261  */
262 static __rte_always_inline struct qnode *
263 _qnode_alloc(void)
264 {
265 	struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
266 	int prealloc_size = p->pre_alloc;
267 	struct qnode *n;
268 	int i;
269 
270 	if (likely(p->fast_alloc != NULL)) {
271 		n = p->fast_alloc;
272 		p->fast_alloc = NULL;
273 		return n;
274 	}
275 
276 	n = _qnode_pool_remove(p);
277 
278 	if (unlikely(n == NULL)) {
279 		DIAG_COUNT_INC(p, prealloc);
280 		for (i = 0; i < prealloc_size; i++) {
281 			n = rte_malloc_socket(NULL,
282 					sizeof(struct qnode),
283 					RTE_CACHE_LINE_SIZE,
284 					rte_socket_id());
285 			if (n == NULL)
286 				return NULL;
287 
288 			DIAG_COUNT_INC(p, available);
289 			DIAG_COUNT_INC(p, capacity);
290 
291 			n->pool = p;
292 			_qnode_pool_insert(p, n);
293 		}
294 		n = _qnode_pool_remove(p);
295 	}
296 	n->pool = p;
297 	DIAG_COUNT_INC(p, rd);
298 	DIAG_COUNT_DEC(p, available);
299 	return n;
300 }
301 
302 
303 
304 /*
305 * free a queue node to the per scheduler pool from which it came
306 */
307 static __rte_always_inline void
308 _qnode_free(struct qnode *n)
309 {
310 	struct qnode_pool *p = n->pool;
311 
312 
313 	if (unlikely(p->fast_alloc != NULL) ||
314 			unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
315 		DIAG_COUNT_INC(p, wr);
316 		DIAG_COUNT_INC(p, available);
317 		_qnode_pool_insert(p, n);
318 		return;
319 	}
320 	p->fast_alloc = n;
321 }
322 
323 /*
324  * Destroy an qnode pool
325  * queue must be empty when this is called
326  */
327 static inline int
328 _qnode_pool_destroy(struct qnode_pool *p)
329 {
330 	rte_free(p->stub);
331 	rte_free(p);
332 	return 0;
333 }
334 
335 #ifdef __cplusplus
336 }
337 #endif
338 
339 #endif				/* LTHREAD_POOL_H_ */
340