1a9643ea8Slogwang /*
2a9643ea8Slogwang  *-
3a9643ea8Slogwang  *   BSD LICENSE
4a9643ea8Slogwang  *
5a9643ea8Slogwang  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
6a9643ea8Slogwang  *   All rights reserved.
7a9643ea8Slogwang  *
8a9643ea8Slogwang  *   Redistribution and use in source and binary forms, with or without
9a9643ea8Slogwang  *   modification, are permitted provided that the following conditions
10a9643ea8Slogwang  *   are met:
11a9643ea8Slogwang  *
12a9643ea8Slogwang  *     * Redistributions of source code must retain the above copyright
13a9643ea8Slogwang  *       notice, this list of conditions and the following disclaimer.
14a9643ea8Slogwang  *     * Redistributions in binary form must reproduce the above copyright
15a9643ea8Slogwang  *       notice, this list of conditions and the following disclaimer in
16a9643ea8Slogwang  *       the documentation and/or other materials provided with the
17a9643ea8Slogwang  *       distribution.
18a9643ea8Slogwang  *     * Neither the name of Intel Corporation nor the names of its
19a9643ea8Slogwang  *       contributors may be used to endorse or promote products derived
20a9643ea8Slogwang  *       from this software without specific prior written permission.
21a9643ea8Slogwang  *
22a9643ea8Slogwang  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23a9643ea8Slogwang  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24a9643ea8Slogwang  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25a9643ea8Slogwang  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26a9643ea8Slogwang  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27a9643ea8Slogwang  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28a9643ea8Slogwang  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29a9643ea8Slogwang  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30a9643ea8Slogwang  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31a9643ea8Slogwang  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32a9643ea8Slogwang  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33a9643ea8Slogwang  */
34a9643ea8Slogwang 
35a9643ea8Slogwang /*
36a9643ea8Slogwang  * Some portions of this software is derived from the producer
37a9643ea8Slogwang  * consumer queues described by Dmitry Vyukov and published  here
38a9643ea8Slogwang  * http://www.1024cores.net
39a9643ea8Slogwang  *
40a9643ea8Slogwang  * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41a9643ea8Slogwang  * Redistribution and use in source and binary forms, with or without
42a9643ea8Slogwang  * modification, are permitted provided that the following conditions
43a9643ea8Slogwang  * are met
44a9643ea8Slogwang  *
45a9643ea8Slogwang  * 1. Redistributions of source code must retain the above copyright notice,
46a9643ea8Slogwang  * this list of conditions and the following disclaimer.
47a9643ea8Slogwang  *
48a9643ea8Slogwang  * 2. Redistributions in binary form must reproduce the above copyright notice,
49a9643ea8Slogwang  * this list of conditions and the following disclaimer in the documentation
50a9643ea8Slogwang  * and/or other materials provided with the distribution.
51a9643ea8Slogwang  *
52a9643ea8Slogwang  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53a9643ea8Slogwang  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54a9643ea8Slogwang  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55a9643ea8Slogwang  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56a9643ea8Slogwang  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57a9643ea8Slogwang  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58a9643ea8Slogwang  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59a9643ea8Slogwang  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60a9643ea8Slogwang  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61a9643ea8Slogwang  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62a9643ea8Slogwang  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
63a9643ea8Slogwang  *
64a9643ea8Slogwang  * The views and conclusions contained in the software and documentation are
65a9643ea8Slogwang  * those of the authors and should not be interpreted as representing official
66a9643ea8Slogwang  * policies, either expressed or implied, of Dmitry Vyukov.
67a9643ea8Slogwang  */
68a9643ea8Slogwang 
69a9643ea8Slogwang #ifndef LTHREAD_POOL_H_
70a9643ea8Slogwang #define LTHREAD_POOL_H_
71a9643ea8Slogwang 
72*2bfe3f2eSlogwang #ifdef __cplusplus
73*2bfe3f2eSlogwang extern "C" {
74*2bfe3f2eSlogwang #endif
75*2bfe3f2eSlogwang 
76a9643ea8Slogwang #include <rte_malloc.h>
77a9643ea8Slogwang #include <rte_per_lcore.h>
78a9643ea8Slogwang #include <rte_log.h>
79a9643ea8Slogwang 
80a9643ea8Slogwang #include "lthread_int.h"
81a9643ea8Slogwang #include "lthread_diag.h"
82a9643ea8Slogwang 
83a9643ea8Slogwang /*
84a9643ea8Slogwang  * This file implements pool of queue nodes used by the queue implemented
85a9643ea8Slogwang  * in lthread_queue.h.
86a9643ea8Slogwang  *
87a9643ea8Slogwang  * The pool is an intrusive lock free MPSC queue.
88a9643ea8Slogwang  *
89a9643ea8Slogwang  * The pool is created empty and populated lazily, i.e. on first attempt to
90a9643ea8Slogwang  * allocate a the pool.
91a9643ea8Slogwang  *
92a9643ea8Slogwang  * Whenever the pool is empty more nodes are added to the pool
93a9643ea8Slogwang  * The number of nodes preallocated in this way is a parameter of
94a9643ea8Slogwang  * _qnode_pool_create. Freeing an object returns it to the pool.
95a9643ea8Slogwang  *
96a9643ea8Slogwang  * Each lthread scheduler maintains its own pool of nodes. L-threads must always
97a9643ea8Slogwang  * allocate from this local pool ( because it is a single consumer queue ).
98a9643ea8Slogwang  * L-threads can free nodes to any pool (because it is a multi producer queue)
99a9643ea8Slogwang  * This enables threads that have affined to a different scheduler to free
100a9643ea8Slogwang  * nodes safely.
101a9643ea8Slogwang  */
102a9643ea8Slogwang 
103a9643ea8Slogwang struct qnode;
104a9643ea8Slogwang struct qnode_cache;
105a9643ea8Slogwang 
106a9643ea8Slogwang /*
107a9643ea8Slogwang  * define intermediate node
108a9643ea8Slogwang  */
109a9643ea8Slogwang struct qnode {
110a9643ea8Slogwang 	struct qnode *next;
111a9643ea8Slogwang 	void *data;
112a9643ea8Slogwang 	struct qnode_pool *pool;
113a9643ea8Slogwang } __rte_cache_aligned;
114a9643ea8Slogwang 
115a9643ea8Slogwang /*
116a9643ea8Slogwang  * a pool structure
117a9643ea8Slogwang  */
118a9643ea8Slogwang struct qnode_pool {
119a9643ea8Slogwang 	struct qnode *head;
120a9643ea8Slogwang 	struct qnode *stub;
121a9643ea8Slogwang 	struct qnode *fast_alloc;
122a9643ea8Slogwang 	struct qnode *tail __rte_cache_aligned;
123a9643ea8Slogwang 	int pre_alloc;
124a9643ea8Slogwang 	char name[LT_MAX_NAME_SIZE];
125a9643ea8Slogwang 
126a9643ea8Slogwang 	DIAG_COUNT_DEFINE(rd);
127a9643ea8Slogwang 	DIAG_COUNT_DEFINE(wr);
128a9643ea8Slogwang 	DIAG_COUNT_DEFINE(available);
129a9643ea8Slogwang 	DIAG_COUNT_DEFINE(prealloc);
130a9643ea8Slogwang 	DIAG_COUNT_DEFINE(capacity);
131a9643ea8Slogwang } __rte_cache_aligned;
132a9643ea8Slogwang 
133a9643ea8Slogwang /*
134a9643ea8Slogwang  * Create a pool of qnodes
135a9643ea8Slogwang  */
136a9643ea8Slogwang 
137a9643ea8Slogwang static inline struct qnode_pool *
138a9643ea8Slogwang _qnode_pool_create(const char *name, int prealloc_size) {
139a9643ea8Slogwang 
140a9643ea8Slogwang 	struct qnode_pool *p = rte_malloc_socket(NULL,
141a9643ea8Slogwang 					sizeof(struct qnode_pool),
142a9643ea8Slogwang 					RTE_CACHE_LINE_SIZE,
143a9643ea8Slogwang 					rte_socket_id());
144a9643ea8Slogwang 
145a9643ea8Slogwang 	RTE_ASSERT(p);
146a9643ea8Slogwang 
147a9643ea8Slogwang 	p->stub = rte_malloc_socket(NULL,
148a9643ea8Slogwang 				sizeof(struct qnode),
149a9643ea8Slogwang 				RTE_CACHE_LINE_SIZE,
150a9643ea8Slogwang 				rte_socket_id());
151a9643ea8Slogwang 
152a9643ea8Slogwang 	RTE_ASSERT(p->stub);
153a9643ea8Slogwang 
154a9643ea8Slogwang 	if (name != NULL)
155a9643ea8Slogwang 		strncpy(p->name, name, LT_MAX_NAME_SIZE);
156a9643ea8Slogwang 	p->name[sizeof(p->name)-1] = 0;
157a9643ea8Slogwang 
158a9643ea8Slogwang 	p->stub->pool = p;
159a9643ea8Slogwang 	p->stub->next = NULL;
160a9643ea8Slogwang 	p->tail = p->stub;
161a9643ea8Slogwang 	p->head = p->stub;
162a9643ea8Slogwang 	p->pre_alloc = prealloc_size;
163a9643ea8Slogwang 
164a9643ea8Slogwang 	DIAG_COUNT_INIT(p, rd);
165a9643ea8Slogwang 	DIAG_COUNT_INIT(p, wr);
166a9643ea8Slogwang 	DIAG_COUNT_INIT(p, available);
167a9643ea8Slogwang 	DIAG_COUNT_INIT(p, prealloc);
168a9643ea8Slogwang 	DIAG_COUNT_INIT(p, capacity);
169a9643ea8Slogwang 
170a9643ea8Slogwang 	return p;
171a9643ea8Slogwang }
172a9643ea8Slogwang 
173a9643ea8Slogwang 
174a9643ea8Slogwang /*
175a9643ea8Slogwang  * Insert a node into the pool
176a9643ea8Slogwang  */
177*2bfe3f2eSlogwang static __rte_always_inline void
178a9643ea8Slogwang _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
179a9643ea8Slogwang {
180a9643ea8Slogwang 	n->next = NULL;
181a9643ea8Slogwang 	struct qnode *prev = n;
182a9643ea8Slogwang 	/* We insert at the head */
183a9643ea8Slogwang 	prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
184a9643ea8Slogwang 						(uint64_t) prev);
185a9643ea8Slogwang 	/* there is a window of inconsistency until prev next is set */
186a9643ea8Slogwang 	/* which is why remove must retry */
187a9643ea8Slogwang 	prev->next = (n);
188a9643ea8Slogwang }
189a9643ea8Slogwang 
190a9643ea8Slogwang /*
191a9643ea8Slogwang  * Remove a node from the pool
192a9643ea8Slogwang  *
193a9643ea8Slogwang  * There is a race with _qnode_pool_insert() whereby the queue could appear
194a9643ea8Slogwang  * empty during a concurrent insert, this is handled by retrying
195a9643ea8Slogwang  *
196a9643ea8Slogwang  * The queue uses a stub node, which must be swung as the queue becomes
197a9643ea8Slogwang  * empty, this requires an insert of the stub, which means that removing the
198a9643ea8Slogwang  * last item from the queue incurs the penalty of an atomic exchange. Since the
199a9643ea8Slogwang  * pool is maintained with a bulk pre-allocation the cost of this is amortised.
200a9643ea8Slogwang  */
201*2bfe3f2eSlogwang static __rte_always_inline struct qnode *
202a9643ea8Slogwang _pool_remove(struct qnode_pool *p)
203a9643ea8Slogwang {
204a9643ea8Slogwang 	struct qnode *head;
205a9643ea8Slogwang 	struct qnode *tail = p->tail;
206a9643ea8Slogwang 	struct qnode *next = tail->next;
207a9643ea8Slogwang 
208a9643ea8Slogwang 	/* we remove from the tail */
209a9643ea8Slogwang 	if (tail == p->stub) {
210a9643ea8Slogwang 		if (next == NULL)
211a9643ea8Slogwang 			return NULL;
212a9643ea8Slogwang 		/* advance the tail */
213a9643ea8Slogwang 		p->tail = next;
214a9643ea8Slogwang 		tail = next;
215a9643ea8Slogwang 		next = next->next;
216a9643ea8Slogwang 	}
217a9643ea8Slogwang 	if (likely(next != NULL)) {
218a9643ea8Slogwang 		p->tail = next;
219a9643ea8Slogwang 		return tail;
220a9643ea8Slogwang 	}
221a9643ea8Slogwang 
222a9643ea8Slogwang 	head = p->head;
223a9643ea8Slogwang 	if (tail == head)
224a9643ea8Slogwang 		return NULL;
225a9643ea8Slogwang 
226a9643ea8Slogwang 	/* swing stub node */
227a9643ea8Slogwang 	_qnode_pool_insert(p, p->stub);
228a9643ea8Slogwang 
229a9643ea8Slogwang 	next = tail->next;
230a9643ea8Slogwang 	if (next) {
231a9643ea8Slogwang 		p->tail = next;
232a9643ea8Slogwang 		return tail;
233a9643ea8Slogwang 	}
234a9643ea8Slogwang 	return NULL;
235a9643ea8Slogwang }
236a9643ea8Slogwang 
237a9643ea8Slogwang 
238a9643ea8Slogwang /*
239a9643ea8Slogwang  * This adds a retry to the _pool_remove function
240a9643ea8Slogwang  * defined above
241a9643ea8Slogwang  */
242*2bfe3f2eSlogwang static __rte_always_inline struct qnode *
243a9643ea8Slogwang _qnode_pool_remove(struct qnode_pool *p)
244a9643ea8Slogwang {
245a9643ea8Slogwang 	struct qnode *n;
246a9643ea8Slogwang 
247a9643ea8Slogwang 	do {
248a9643ea8Slogwang 		n = _pool_remove(p);
249a9643ea8Slogwang 		if (likely(n != NULL))
250a9643ea8Slogwang 			return n;
251a9643ea8Slogwang 
252a9643ea8Slogwang 		rte_compiler_barrier();
253a9643ea8Slogwang 	}  while ((p->head != p->tail) &&
254a9643ea8Slogwang 			(p->tail != p->stub));
255a9643ea8Slogwang 	return NULL;
256a9643ea8Slogwang }
257a9643ea8Slogwang 
258a9643ea8Slogwang /*
259a9643ea8Slogwang  * Allocate a node from the pool
260a9643ea8Slogwang  * If the pool is empty add mode nodes
261a9643ea8Slogwang  */
262*2bfe3f2eSlogwang static __rte_always_inline struct qnode *
263a9643ea8Slogwang _qnode_alloc(void)
264a9643ea8Slogwang {
265a9643ea8Slogwang 	struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
266a9643ea8Slogwang 	int prealloc_size = p->pre_alloc;
267a9643ea8Slogwang 	struct qnode *n;
268a9643ea8Slogwang 	int i;
269a9643ea8Slogwang 
270a9643ea8Slogwang 	if (likely(p->fast_alloc != NULL)) {
271a9643ea8Slogwang 		n = p->fast_alloc;
272a9643ea8Slogwang 		p->fast_alloc = NULL;
273a9643ea8Slogwang 		return n;
274a9643ea8Slogwang 	}
275a9643ea8Slogwang 
276a9643ea8Slogwang 	n = _qnode_pool_remove(p);
277a9643ea8Slogwang 
278a9643ea8Slogwang 	if (unlikely(n == NULL)) {
279a9643ea8Slogwang 		DIAG_COUNT_INC(p, prealloc);
280a9643ea8Slogwang 		for (i = 0; i < prealloc_size; i++) {
281a9643ea8Slogwang 			n = rte_malloc_socket(NULL,
282a9643ea8Slogwang 					sizeof(struct qnode),
283a9643ea8Slogwang 					RTE_CACHE_LINE_SIZE,
284a9643ea8Slogwang 					rte_socket_id());
285a9643ea8Slogwang 			if (n == NULL)
286a9643ea8Slogwang 				return NULL;
287a9643ea8Slogwang 
288a9643ea8Slogwang 			DIAG_COUNT_INC(p, available);
289a9643ea8Slogwang 			DIAG_COUNT_INC(p, capacity);
290a9643ea8Slogwang 
291a9643ea8Slogwang 			n->pool = p;
292a9643ea8Slogwang 			_qnode_pool_insert(p, n);
293a9643ea8Slogwang 		}
294a9643ea8Slogwang 		n = _qnode_pool_remove(p);
295a9643ea8Slogwang 	}
296a9643ea8Slogwang 	n->pool = p;
297a9643ea8Slogwang 	DIAG_COUNT_INC(p, rd);
298a9643ea8Slogwang 	DIAG_COUNT_DEC(p, available);
299a9643ea8Slogwang 	return n;
300a9643ea8Slogwang }
301a9643ea8Slogwang 
302a9643ea8Slogwang 
303a9643ea8Slogwang 
304a9643ea8Slogwang /*
305a9643ea8Slogwang * free a queue node to the per scheduler pool from which it came
306a9643ea8Slogwang */
307*2bfe3f2eSlogwang static __rte_always_inline void
308a9643ea8Slogwang _qnode_free(struct qnode *n)
309a9643ea8Slogwang {
310a9643ea8Slogwang 	struct qnode_pool *p = n->pool;
311a9643ea8Slogwang 
312a9643ea8Slogwang 
313a9643ea8Slogwang 	if (unlikely(p->fast_alloc != NULL) ||
314a9643ea8Slogwang 			unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
315a9643ea8Slogwang 		DIAG_COUNT_INC(p, wr);
316a9643ea8Slogwang 		DIAG_COUNT_INC(p, available);
317a9643ea8Slogwang 		_qnode_pool_insert(p, n);
318a9643ea8Slogwang 		return;
319a9643ea8Slogwang 	}
320a9643ea8Slogwang 	p->fast_alloc = n;
321a9643ea8Slogwang }
322a9643ea8Slogwang 
323a9643ea8Slogwang /*
324a9643ea8Slogwang  * Destroy an qnode pool
325a9643ea8Slogwang  * queue must be empty when this is called
326a9643ea8Slogwang  */
327a9643ea8Slogwang static inline int
328a9643ea8Slogwang _qnode_pool_destroy(struct qnode_pool *p)
329a9643ea8Slogwang {
330a9643ea8Slogwang 	rte_free(p->stub);
331a9643ea8Slogwang 	rte_free(p);
332a9643ea8Slogwang 	return 0;
333a9643ea8Slogwang }
334a9643ea8Slogwang 
335*2bfe3f2eSlogwang #ifdef __cplusplus
336*2bfe3f2eSlogwang }
337*2bfe3f2eSlogwang #endif
338a9643ea8Slogwang 
339a9643ea8Slogwang #endif				/* LTHREAD_POOL_H_ */
340