1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2015 Intel Corporation.
4  * Copyright 2010-2011 Dmitry Vyukov
5  */
6 
7 #ifndef LTHREAD_POOL_H_
8 #define LTHREAD_POOL_H_
9 
10 #ifdef __cplusplus
11 extern "C" {
12 #endif
13 
14 #include <rte_malloc.h>
15 #include <rte_per_lcore.h>
16 #include <rte_log.h>
17 
18 #include "lthread_int.h"
19 #include "lthread_diag.h"
20 
21 /*
22  * This file implements pool of queue nodes used by the queue implemented
23  * in lthread_queue.h.
24  *
25  * The pool is an intrusive lock free MPSC queue.
26  *
27  * The pool is created empty and populated lazily, i.e. on first attempt to
28  * allocate a the pool.
29  *
30  * Whenever the pool is empty more nodes are added to the pool
31  * The number of nodes preallocated in this way is a parameter of
32  * _qnode_pool_create. Freeing an object returns it to the pool.
33  *
34  * Each lthread scheduler maintains its own pool of nodes. L-threads must always
35  * allocate from this local pool ( because it is a single consumer queue ).
36  * L-threads can free nodes to any pool (because it is a multi producer queue)
37  * This enables threads that have affined to a different scheduler to free
38  * nodes safely.
39  */
40 
41 struct qnode;
42 struct qnode_cache;
43 
44 /*
45  * define intermediate node
46  */
47 struct qnode {
48 	struct qnode *next;
49 	void *data;
50 	struct qnode_pool *pool;
51 } __rte_cache_aligned;
52 
53 /*
54  * a pool structure
55  */
56 struct qnode_pool {
57 	struct qnode *head;
58 	struct qnode *stub;
59 	struct qnode *fast_alloc;
60 	struct qnode *tail __rte_cache_aligned;
61 	int pre_alloc;
62 	char name[LT_MAX_NAME_SIZE];
63 
64 	DIAG_COUNT_DEFINE(rd);
65 	DIAG_COUNT_DEFINE(wr);
66 	DIAG_COUNT_DEFINE(available);
67 	DIAG_COUNT_DEFINE(prealloc);
68 	DIAG_COUNT_DEFINE(capacity);
69 } __rte_cache_aligned;
70 
71 /*
72  * Create a pool of qnodes
73  */
74 
75 static inline struct qnode_pool *
76 _qnode_pool_create(const char *name, int prealloc_size) {
77 
78 	struct qnode_pool *p = rte_malloc_socket(NULL,
79 					sizeof(struct qnode_pool),
80 					RTE_CACHE_LINE_SIZE,
81 					rte_socket_id());
82 
83 	RTE_ASSERT(p);
84 
85 	p->stub = rte_malloc_socket(NULL,
86 				sizeof(struct qnode),
87 				RTE_CACHE_LINE_SIZE,
88 				rte_socket_id());
89 
90 	RTE_ASSERT(p->stub);
91 
92 	if (name != NULL)
93 		strncpy(p->name, name, LT_MAX_NAME_SIZE);
94 	p->name[sizeof(p->name)-1] = 0;
95 
96 	p->stub->pool = p;
97 	p->stub->next = NULL;
98 	p->tail = p->stub;
99 	p->head = p->stub;
100 	p->pre_alloc = prealloc_size;
101 
102 	DIAG_COUNT_INIT(p, rd);
103 	DIAG_COUNT_INIT(p, wr);
104 	DIAG_COUNT_INIT(p, available);
105 	DIAG_COUNT_INIT(p, prealloc);
106 	DIAG_COUNT_INIT(p, capacity);
107 
108 	return p;
109 }
110 
111 
112 /*
113  * Insert a node into the pool
114  */
115 static __rte_always_inline void
116 _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
117 {
118 	n->next = NULL;
119 	struct qnode *prev = n;
120 	/* We insert at the head */
121 	prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
122 						(uint64_t) prev);
123 	/* there is a window of inconsistency until prev next is set */
124 	/* which is why remove must retry */
125 	prev->next = (n);
126 }
127 
128 /*
129  * Remove a node from the pool
130  *
131  * There is a race with _qnode_pool_insert() whereby the queue could appear
132  * empty during a concurrent insert, this is handled by retrying
133  *
134  * The queue uses a stub node, which must be swung as the queue becomes
135  * empty, this requires an insert of the stub, which means that removing the
136  * last item from the queue incurs the penalty of an atomic exchange. Since the
137  * pool is maintained with a bulk pre-allocation the cost of this is amortised.
138  */
139 static __rte_always_inline struct qnode *
140 _pool_remove(struct qnode_pool *p)
141 {
142 	struct qnode *head;
143 	struct qnode *tail = p->tail;
144 	struct qnode *next = tail->next;
145 
146 	/* we remove from the tail */
147 	if (tail == p->stub) {
148 		if (next == NULL)
149 			return NULL;
150 		/* advance the tail */
151 		p->tail = next;
152 		tail = next;
153 		next = next->next;
154 	}
155 	if (likely(next != NULL)) {
156 		p->tail = next;
157 		return tail;
158 	}
159 
160 	head = p->head;
161 	if (tail == head)
162 		return NULL;
163 
164 	/* swing stub node */
165 	_qnode_pool_insert(p, p->stub);
166 
167 	next = tail->next;
168 	if (next) {
169 		p->tail = next;
170 		return tail;
171 	}
172 	return NULL;
173 }
174 
175 
176 /*
177  * This adds a retry to the _pool_remove function
178  * defined above
179  */
180 static __rte_always_inline struct qnode *
181 _qnode_pool_remove(struct qnode_pool *p)
182 {
183 	struct qnode *n;
184 
185 	do {
186 		n = _pool_remove(p);
187 		if (likely(n != NULL))
188 			return n;
189 
190 		rte_compiler_barrier();
191 	}  while ((p->head != p->tail) &&
192 			(p->tail != p->stub));
193 	return NULL;
194 }
195 
196 /*
197  * Allocate a node from the pool
198  * If the pool is empty add mode nodes
199  */
200 static __rte_always_inline struct qnode *
201 _qnode_alloc(void)
202 {
203 	struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
204 	int prealloc_size = p->pre_alloc;
205 	struct qnode *n;
206 	int i;
207 
208 	if (likely(p->fast_alloc != NULL)) {
209 		n = p->fast_alloc;
210 		p->fast_alloc = NULL;
211 		return n;
212 	}
213 
214 	n = _qnode_pool_remove(p);
215 
216 	if (unlikely(n == NULL)) {
217 		DIAG_COUNT_INC(p, prealloc);
218 		for (i = 0; i < prealloc_size; i++) {
219 			n = rte_malloc_socket(NULL,
220 					sizeof(struct qnode),
221 					RTE_CACHE_LINE_SIZE,
222 					rte_socket_id());
223 			if (n == NULL)
224 				return NULL;
225 
226 			DIAG_COUNT_INC(p, available);
227 			DIAG_COUNT_INC(p, capacity);
228 
229 			n->pool = p;
230 			_qnode_pool_insert(p, n);
231 		}
232 		n = _qnode_pool_remove(p);
233 	}
234 	n->pool = p;
235 	DIAG_COUNT_INC(p, rd);
236 	DIAG_COUNT_DEC(p, available);
237 	return n;
238 }
239 
240 
241 
242 /*
243 * free a queue node to the per scheduler pool from which it came
244 */
245 static __rte_always_inline void
246 _qnode_free(struct qnode *n)
247 {
248 	struct qnode_pool *p = n->pool;
249 
250 
251 	if (unlikely(p->fast_alloc != NULL) ||
252 			unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
253 		DIAG_COUNT_INC(p, wr);
254 		DIAG_COUNT_INC(p, available);
255 		_qnode_pool_insert(p, n);
256 		return;
257 	}
258 	p->fast_alloc = n;
259 }
260 
261 /*
262  * Destroy an qnode pool
263  * queue must be empty when this is called
264  */
265 static inline int
266 _qnode_pool_destroy(struct qnode_pool *p)
267 {
268 	rte_free(p->stub);
269 	rte_free(p);
270 	return 0;
271 }
272 
273 #ifdef __cplusplus
274 }
275 #endif
276 
277 #endif				/* LTHREAD_POOL_H_ */
278