1a9643ea8Slogwang /*
2*d30ea906Sjfb8856606  * SPDX-License-Identifier: BSD-3-Clause
3*d30ea906Sjfb8856606  * Copyright 2015 Intel Corporation.
4*d30ea906Sjfb8856606  * Copyright 2010-2011 Dmitry Vyukov
5a9643ea8Slogwang  */
6a9643ea8Slogwang 
7a9643ea8Slogwang #ifndef LTHREAD_POOL_H_
8a9643ea8Slogwang #define LTHREAD_POOL_H_
9a9643ea8Slogwang 
102bfe3f2eSlogwang #ifdef __cplusplus
112bfe3f2eSlogwang extern "C" {
122bfe3f2eSlogwang #endif
132bfe3f2eSlogwang 
14a9643ea8Slogwang #include <rte_malloc.h>
15a9643ea8Slogwang #include <rte_per_lcore.h>
16a9643ea8Slogwang #include <rte_log.h>
17a9643ea8Slogwang 
18a9643ea8Slogwang #include "lthread_int.h"
19a9643ea8Slogwang #include "lthread_diag.h"
20a9643ea8Slogwang 
21a9643ea8Slogwang /*
22a9643ea8Slogwang  * This file implements pool of queue nodes used by the queue implemented
23a9643ea8Slogwang  * in lthread_queue.h.
24a9643ea8Slogwang  *
25a9643ea8Slogwang  * The pool is an intrusive lock free MPSC queue.
26a9643ea8Slogwang  *
27a9643ea8Slogwang  * The pool is created empty and populated lazily, i.e. on first attempt to
28a9643ea8Slogwang  * allocate a the pool.
29a9643ea8Slogwang  *
30a9643ea8Slogwang  * Whenever the pool is empty more nodes are added to the pool
31a9643ea8Slogwang  * The number of nodes preallocated in this way is a parameter of
32a9643ea8Slogwang  * _qnode_pool_create. Freeing an object returns it to the pool.
33a9643ea8Slogwang  *
34a9643ea8Slogwang  * Each lthread scheduler maintains its own pool of nodes. L-threads must always
35a9643ea8Slogwang  * allocate from this local pool ( because it is a single consumer queue ).
36a9643ea8Slogwang  * L-threads can free nodes to any pool (because it is a multi producer queue)
37a9643ea8Slogwang  * This enables threads that have affined to a different scheduler to free
38a9643ea8Slogwang  * nodes safely.
39a9643ea8Slogwang  */
40a9643ea8Slogwang 
41a9643ea8Slogwang struct qnode;
42a9643ea8Slogwang struct qnode_cache;
43a9643ea8Slogwang 
44a9643ea8Slogwang /*
45a9643ea8Slogwang  * define intermediate node
46a9643ea8Slogwang  */
47a9643ea8Slogwang struct qnode {
48a9643ea8Slogwang 	struct qnode *next;
49a9643ea8Slogwang 	void *data;
50a9643ea8Slogwang 	struct qnode_pool *pool;
51a9643ea8Slogwang } __rte_cache_aligned;
52a9643ea8Slogwang 
53a9643ea8Slogwang /*
54a9643ea8Slogwang  * a pool structure
55a9643ea8Slogwang  */
56a9643ea8Slogwang struct qnode_pool {
57a9643ea8Slogwang 	struct qnode *head;
58a9643ea8Slogwang 	struct qnode *stub;
59a9643ea8Slogwang 	struct qnode *fast_alloc;
60a9643ea8Slogwang 	struct qnode *tail __rte_cache_aligned;
61a9643ea8Slogwang 	int pre_alloc;
62a9643ea8Slogwang 	char name[LT_MAX_NAME_SIZE];
63a9643ea8Slogwang 
64a9643ea8Slogwang 	DIAG_COUNT_DEFINE(rd);
65a9643ea8Slogwang 	DIAG_COUNT_DEFINE(wr);
66a9643ea8Slogwang 	DIAG_COUNT_DEFINE(available);
67a9643ea8Slogwang 	DIAG_COUNT_DEFINE(prealloc);
68a9643ea8Slogwang 	DIAG_COUNT_DEFINE(capacity);
69a9643ea8Slogwang } __rte_cache_aligned;
70a9643ea8Slogwang 
71a9643ea8Slogwang /*
72a9643ea8Slogwang  * Create a pool of qnodes
73a9643ea8Slogwang  */
74a9643ea8Slogwang 
75a9643ea8Slogwang static inline struct qnode_pool *
_qnode_pool_create(const char * name,int prealloc_size)76a9643ea8Slogwang _qnode_pool_create(const char *name, int prealloc_size) {
77a9643ea8Slogwang 
78a9643ea8Slogwang 	struct qnode_pool *p = rte_malloc_socket(NULL,
79a9643ea8Slogwang 					sizeof(struct qnode_pool),
80a9643ea8Slogwang 					RTE_CACHE_LINE_SIZE,
81a9643ea8Slogwang 					rte_socket_id());
82a9643ea8Slogwang 
83a9643ea8Slogwang 	RTE_ASSERT(p);
84a9643ea8Slogwang 
85a9643ea8Slogwang 	p->stub = rte_malloc_socket(NULL,
86a9643ea8Slogwang 				sizeof(struct qnode),
87a9643ea8Slogwang 				RTE_CACHE_LINE_SIZE,
88a9643ea8Slogwang 				rte_socket_id());
89a9643ea8Slogwang 
90a9643ea8Slogwang 	RTE_ASSERT(p->stub);
91a9643ea8Slogwang 
92a9643ea8Slogwang 	if (name != NULL)
93a9643ea8Slogwang 		strncpy(p->name, name, LT_MAX_NAME_SIZE);
94a9643ea8Slogwang 	p->name[sizeof(p->name)-1] = 0;
95a9643ea8Slogwang 
96a9643ea8Slogwang 	p->stub->pool = p;
97a9643ea8Slogwang 	p->stub->next = NULL;
98a9643ea8Slogwang 	p->tail = p->stub;
99a9643ea8Slogwang 	p->head = p->stub;
100a9643ea8Slogwang 	p->pre_alloc = prealloc_size;
101a9643ea8Slogwang 
102a9643ea8Slogwang 	DIAG_COUNT_INIT(p, rd);
103a9643ea8Slogwang 	DIAG_COUNT_INIT(p, wr);
104a9643ea8Slogwang 	DIAG_COUNT_INIT(p, available);
105a9643ea8Slogwang 	DIAG_COUNT_INIT(p, prealloc);
106a9643ea8Slogwang 	DIAG_COUNT_INIT(p, capacity);
107a9643ea8Slogwang 
108a9643ea8Slogwang 	return p;
109a9643ea8Slogwang }
110a9643ea8Slogwang 
111a9643ea8Slogwang 
112a9643ea8Slogwang /*
113a9643ea8Slogwang  * Insert a node into the pool
114a9643ea8Slogwang  */
1152bfe3f2eSlogwang static __rte_always_inline void
_qnode_pool_insert(struct qnode_pool * p,struct qnode * n)116a9643ea8Slogwang _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
117a9643ea8Slogwang {
118a9643ea8Slogwang 	n->next = NULL;
119a9643ea8Slogwang 	struct qnode *prev = n;
120a9643ea8Slogwang 	/* We insert at the head */
121a9643ea8Slogwang 	prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
122a9643ea8Slogwang 						(uint64_t) prev);
123a9643ea8Slogwang 	/* there is a window of inconsistency until prev next is set */
124a9643ea8Slogwang 	/* which is why remove must retry */
125a9643ea8Slogwang 	prev->next = (n);
126a9643ea8Slogwang }
127a9643ea8Slogwang 
128a9643ea8Slogwang /*
129a9643ea8Slogwang  * Remove a node from the pool
130a9643ea8Slogwang  *
131a9643ea8Slogwang  * There is a race with _qnode_pool_insert() whereby the queue could appear
132a9643ea8Slogwang  * empty during a concurrent insert, this is handled by retrying
133a9643ea8Slogwang  *
134a9643ea8Slogwang  * The queue uses a stub node, which must be swung as the queue becomes
135a9643ea8Slogwang  * empty, this requires an insert of the stub, which means that removing the
136a9643ea8Slogwang  * last item from the queue incurs the penalty of an atomic exchange. Since the
137a9643ea8Slogwang  * pool is maintained with a bulk pre-allocation the cost of this is amortised.
138a9643ea8Slogwang  */
1392bfe3f2eSlogwang static __rte_always_inline struct qnode *
_pool_remove(struct qnode_pool * p)140a9643ea8Slogwang _pool_remove(struct qnode_pool *p)
141a9643ea8Slogwang {
142a9643ea8Slogwang 	struct qnode *head;
143a9643ea8Slogwang 	struct qnode *tail = p->tail;
144a9643ea8Slogwang 	struct qnode *next = tail->next;
145a9643ea8Slogwang 
146a9643ea8Slogwang 	/* we remove from the tail */
147a9643ea8Slogwang 	if (tail == p->stub) {
148a9643ea8Slogwang 		if (next == NULL)
149a9643ea8Slogwang 			return NULL;
150a9643ea8Slogwang 		/* advance the tail */
151a9643ea8Slogwang 		p->tail = next;
152a9643ea8Slogwang 		tail = next;
153a9643ea8Slogwang 		next = next->next;
154a9643ea8Slogwang 	}
155a9643ea8Slogwang 	if (likely(next != NULL)) {
156a9643ea8Slogwang 		p->tail = next;
157a9643ea8Slogwang 		return tail;
158a9643ea8Slogwang 	}
159a9643ea8Slogwang 
160a9643ea8Slogwang 	head = p->head;
161a9643ea8Slogwang 	if (tail == head)
162a9643ea8Slogwang 		return NULL;
163a9643ea8Slogwang 
164a9643ea8Slogwang 	/* swing stub node */
165a9643ea8Slogwang 	_qnode_pool_insert(p, p->stub);
166a9643ea8Slogwang 
167a9643ea8Slogwang 	next = tail->next;
168a9643ea8Slogwang 	if (next) {
169a9643ea8Slogwang 		p->tail = next;
170a9643ea8Slogwang 		return tail;
171a9643ea8Slogwang 	}
172a9643ea8Slogwang 	return NULL;
173a9643ea8Slogwang }
174a9643ea8Slogwang 
175a9643ea8Slogwang 
176a9643ea8Slogwang /*
177a9643ea8Slogwang  * This adds a retry to the _pool_remove function
178a9643ea8Slogwang  * defined above
179a9643ea8Slogwang  */
1802bfe3f2eSlogwang static __rte_always_inline struct qnode *
_qnode_pool_remove(struct qnode_pool * p)181a9643ea8Slogwang _qnode_pool_remove(struct qnode_pool *p)
182a9643ea8Slogwang {
183a9643ea8Slogwang 	struct qnode *n;
184a9643ea8Slogwang 
185a9643ea8Slogwang 	do {
186a9643ea8Slogwang 		n = _pool_remove(p);
187a9643ea8Slogwang 		if (likely(n != NULL))
188a9643ea8Slogwang 			return n;
189a9643ea8Slogwang 
190a9643ea8Slogwang 		rte_compiler_barrier();
191a9643ea8Slogwang 	}  while ((p->head != p->tail) &&
192a9643ea8Slogwang 			(p->tail != p->stub));
193a9643ea8Slogwang 	return NULL;
194a9643ea8Slogwang }
195a9643ea8Slogwang 
196a9643ea8Slogwang /*
197a9643ea8Slogwang  * Allocate a node from the pool
198a9643ea8Slogwang  * If the pool is empty add mode nodes
199a9643ea8Slogwang  */
2002bfe3f2eSlogwang static __rte_always_inline struct qnode *
_qnode_alloc(void)201a9643ea8Slogwang _qnode_alloc(void)
202a9643ea8Slogwang {
203a9643ea8Slogwang 	struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
204a9643ea8Slogwang 	int prealloc_size = p->pre_alloc;
205a9643ea8Slogwang 	struct qnode *n;
206a9643ea8Slogwang 	int i;
207a9643ea8Slogwang 
208a9643ea8Slogwang 	if (likely(p->fast_alloc != NULL)) {
209a9643ea8Slogwang 		n = p->fast_alloc;
210a9643ea8Slogwang 		p->fast_alloc = NULL;
211a9643ea8Slogwang 		return n;
212a9643ea8Slogwang 	}
213a9643ea8Slogwang 
214a9643ea8Slogwang 	n = _qnode_pool_remove(p);
215a9643ea8Slogwang 
216a9643ea8Slogwang 	if (unlikely(n == NULL)) {
217a9643ea8Slogwang 		DIAG_COUNT_INC(p, prealloc);
218a9643ea8Slogwang 		for (i = 0; i < prealloc_size; i++) {
219a9643ea8Slogwang 			n = rte_malloc_socket(NULL,
220a9643ea8Slogwang 					sizeof(struct qnode),
221a9643ea8Slogwang 					RTE_CACHE_LINE_SIZE,
222a9643ea8Slogwang 					rte_socket_id());
223a9643ea8Slogwang 			if (n == NULL)
224a9643ea8Slogwang 				return NULL;
225a9643ea8Slogwang 
226a9643ea8Slogwang 			DIAG_COUNT_INC(p, available);
227a9643ea8Slogwang 			DIAG_COUNT_INC(p, capacity);
228a9643ea8Slogwang 
229a9643ea8Slogwang 			n->pool = p;
230a9643ea8Slogwang 			_qnode_pool_insert(p, n);
231a9643ea8Slogwang 		}
232a9643ea8Slogwang 		n = _qnode_pool_remove(p);
233a9643ea8Slogwang 	}
234a9643ea8Slogwang 	n->pool = p;
235a9643ea8Slogwang 	DIAG_COUNT_INC(p, rd);
236a9643ea8Slogwang 	DIAG_COUNT_DEC(p, available);
237a9643ea8Slogwang 	return n;
238a9643ea8Slogwang }
239a9643ea8Slogwang 
240a9643ea8Slogwang 
241a9643ea8Slogwang 
242a9643ea8Slogwang /*
243a9643ea8Slogwang * free a queue node to the per scheduler pool from which it came
244a9643ea8Slogwang */
2452bfe3f2eSlogwang static __rte_always_inline void
_qnode_free(struct qnode * n)246a9643ea8Slogwang _qnode_free(struct qnode *n)
247a9643ea8Slogwang {
248a9643ea8Slogwang 	struct qnode_pool *p = n->pool;
249a9643ea8Slogwang 
250a9643ea8Slogwang 
251a9643ea8Slogwang 	if (unlikely(p->fast_alloc != NULL) ||
252a9643ea8Slogwang 			unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
253a9643ea8Slogwang 		DIAG_COUNT_INC(p, wr);
254a9643ea8Slogwang 		DIAG_COUNT_INC(p, available);
255a9643ea8Slogwang 		_qnode_pool_insert(p, n);
256a9643ea8Slogwang 		return;
257a9643ea8Slogwang 	}
258a9643ea8Slogwang 	p->fast_alloc = n;
259a9643ea8Slogwang }
260a9643ea8Slogwang 
261a9643ea8Slogwang /*
262a9643ea8Slogwang  * Destroy an qnode pool
263a9643ea8Slogwang  * queue must be empty when this is called
264a9643ea8Slogwang  */
265a9643ea8Slogwang static inline int
_qnode_pool_destroy(struct qnode_pool * p)266a9643ea8Slogwang _qnode_pool_destroy(struct qnode_pool *p)
267a9643ea8Slogwang {
268a9643ea8Slogwang 	rte_free(p->stub);
269a9643ea8Slogwang 	rte_free(p);
270a9643ea8Slogwang 	return 0;
271a9643ea8Slogwang }
272a9643ea8Slogwang 
2732bfe3f2eSlogwang #ifdef __cplusplus
2742bfe3f2eSlogwang }
2752bfe3f2eSlogwang #endif
276a9643ea8Slogwang 
277a9643ea8Slogwang #endif				/* LTHREAD_POOL_H_ */
278