1a9643ea8Slogwang /*
2*d30ea906Sjfb8856606 * SPDX-License-Identifier: BSD-3-Clause
3*d30ea906Sjfb8856606 * Copyright 2015 Intel Corporation.
4*d30ea906Sjfb8856606 * Copyright 2010-2011 Dmitry Vyukov
5a9643ea8Slogwang */
6a9643ea8Slogwang
7a9643ea8Slogwang #ifndef LTHREAD_POOL_H_
8a9643ea8Slogwang #define LTHREAD_POOL_H_
9a9643ea8Slogwang
102bfe3f2eSlogwang #ifdef __cplusplus
112bfe3f2eSlogwang extern "C" {
122bfe3f2eSlogwang #endif
132bfe3f2eSlogwang
14a9643ea8Slogwang #include <rte_malloc.h>
15a9643ea8Slogwang #include <rte_per_lcore.h>
16a9643ea8Slogwang #include <rte_log.h>
17a9643ea8Slogwang
18a9643ea8Slogwang #include "lthread_int.h"
19a9643ea8Slogwang #include "lthread_diag.h"
20a9643ea8Slogwang
21a9643ea8Slogwang /*
22a9643ea8Slogwang * This file implements pool of queue nodes used by the queue implemented
23a9643ea8Slogwang * in lthread_queue.h.
24a9643ea8Slogwang *
25a9643ea8Slogwang * The pool is an intrusive lock free MPSC queue.
26a9643ea8Slogwang *
27a9643ea8Slogwang * The pool is created empty and populated lazily, i.e. on first attempt to
28a9643ea8Slogwang * allocate a the pool.
29a9643ea8Slogwang *
30a9643ea8Slogwang * Whenever the pool is empty more nodes are added to the pool
31a9643ea8Slogwang * The number of nodes preallocated in this way is a parameter of
32a9643ea8Slogwang * _qnode_pool_create. Freeing an object returns it to the pool.
33a9643ea8Slogwang *
34a9643ea8Slogwang * Each lthread scheduler maintains its own pool of nodes. L-threads must always
35a9643ea8Slogwang * allocate from this local pool ( because it is a single consumer queue ).
36a9643ea8Slogwang * L-threads can free nodes to any pool (because it is a multi producer queue)
37a9643ea8Slogwang * This enables threads that have affined to a different scheduler to free
38a9643ea8Slogwang * nodes safely.
39a9643ea8Slogwang */
40a9643ea8Slogwang
41a9643ea8Slogwang struct qnode;
42a9643ea8Slogwang struct qnode_cache;
43a9643ea8Slogwang
44a9643ea8Slogwang /*
45a9643ea8Slogwang * define intermediate node
46a9643ea8Slogwang */
47a9643ea8Slogwang struct qnode {
48a9643ea8Slogwang struct qnode *next;
49a9643ea8Slogwang void *data;
50a9643ea8Slogwang struct qnode_pool *pool;
51a9643ea8Slogwang } __rte_cache_aligned;
52a9643ea8Slogwang
53a9643ea8Slogwang /*
54a9643ea8Slogwang * a pool structure
55a9643ea8Slogwang */
56a9643ea8Slogwang struct qnode_pool {
57a9643ea8Slogwang struct qnode *head;
58a9643ea8Slogwang struct qnode *stub;
59a9643ea8Slogwang struct qnode *fast_alloc;
60a9643ea8Slogwang struct qnode *tail __rte_cache_aligned;
61a9643ea8Slogwang int pre_alloc;
62a9643ea8Slogwang char name[LT_MAX_NAME_SIZE];
63a9643ea8Slogwang
64a9643ea8Slogwang DIAG_COUNT_DEFINE(rd);
65a9643ea8Slogwang DIAG_COUNT_DEFINE(wr);
66a9643ea8Slogwang DIAG_COUNT_DEFINE(available);
67a9643ea8Slogwang DIAG_COUNT_DEFINE(prealloc);
68a9643ea8Slogwang DIAG_COUNT_DEFINE(capacity);
69a9643ea8Slogwang } __rte_cache_aligned;
70a9643ea8Slogwang
71a9643ea8Slogwang /*
72a9643ea8Slogwang * Create a pool of qnodes
73a9643ea8Slogwang */
74a9643ea8Slogwang
75a9643ea8Slogwang static inline struct qnode_pool *
_qnode_pool_create(const char * name,int prealloc_size)76a9643ea8Slogwang _qnode_pool_create(const char *name, int prealloc_size) {
77a9643ea8Slogwang
78a9643ea8Slogwang struct qnode_pool *p = rte_malloc_socket(NULL,
79a9643ea8Slogwang sizeof(struct qnode_pool),
80a9643ea8Slogwang RTE_CACHE_LINE_SIZE,
81a9643ea8Slogwang rte_socket_id());
82a9643ea8Slogwang
83a9643ea8Slogwang RTE_ASSERT(p);
84a9643ea8Slogwang
85a9643ea8Slogwang p->stub = rte_malloc_socket(NULL,
86a9643ea8Slogwang sizeof(struct qnode),
87a9643ea8Slogwang RTE_CACHE_LINE_SIZE,
88a9643ea8Slogwang rte_socket_id());
89a9643ea8Slogwang
90a9643ea8Slogwang RTE_ASSERT(p->stub);
91a9643ea8Slogwang
92a9643ea8Slogwang if (name != NULL)
93a9643ea8Slogwang strncpy(p->name, name, LT_MAX_NAME_SIZE);
94a9643ea8Slogwang p->name[sizeof(p->name)-1] = 0;
95a9643ea8Slogwang
96a9643ea8Slogwang p->stub->pool = p;
97a9643ea8Slogwang p->stub->next = NULL;
98a9643ea8Slogwang p->tail = p->stub;
99a9643ea8Slogwang p->head = p->stub;
100a9643ea8Slogwang p->pre_alloc = prealloc_size;
101a9643ea8Slogwang
102a9643ea8Slogwang DIAG_COUNT_INIT(p, rd);
103a9643ea8Slogwang DIAG_COUNT_INIT(p, wr);
104a9643ea8Slogwang DIAG_COUNT_INIT(p, available);
105a9643ea8Slogwang DIAG_COUNT_INIT(p, prealloc);
106a9643ea8Slogwang DIAG_COUNT_INIT(p, capacity);
107a9643ea8Slogwang
108a9643ea8Slogwang return p;
109a9643ea8Slogwang }
110a9643ea8Slogwang
111a9643ea8Slogwang
112a9643ea8Slogwang /*
113a9643ea8Slogwang * Insert a node into the pool
114a9643ea8Slogwang */
1152bfe3f2eSlogwang static __rte_always_inline void
_qnode_pool_insert(struct qnode_pool * p,struct qnode * n)116a9643ea8Slogwang _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
117a9643ea8Slogwang {
118a9643ea8Slogwang n->next = NULL;
119a9643ea8Slogwang struct qnode *prev = n;
120a9643ea8Slogwang /* We insert at the head */
121a9643ea8Slogwang prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
122a9643ea8Slogwang (uint64_t) prev);
123a9643ea8Slogwang /* there is a window of inconsistency until prev next is set */
124a9643ea8Slogwang /* which is why remove must retry */
125a9643ea8Slogwang prev->next = (n);
126a9643ea8Slogwang }
127a9643ea8Slogwang
128a9643ea8Slogwang /*
129a9643ea8Slogwang * Remove a node from the pool
130a9643ea8Slogwang *
131a9643ea8Slogwang * There is a race with _qnode_pool_insert() whereby the queue could appear
132a9643ea8Slogwang * empty during a concurrent insert, this is handled by retrying
133a9643ea8Slogwang *
134a9643ea8Slogwang * The queue uses a stub node, which must be swung as the queue becomes
135a9643ea8Slogwang * empty, this requires an insert of the stub, which means that removing the
136a9643ea8Slogwang * last item from the queue incurs the penalty of an atomic exchange. Since the
137a9643ea8Slogwang * pool is maintained with a bulk pre-allocation the cost of this is amortised.
138a9643ea8Slogwang */
1392bfe3f2eSlogwang static __rte_always_inline struct qnode *
_pool_remove(struct qnode_pool * p)140a9643ea8Slogwang _pool_remove(struct qnode_pool *p)
141a9643ea8Slogwang {
142a9643ea8Slogwang struct qnode *head;
143a9643ea8Slogwang struct qnode *tail = p->tail;
144a9643ea8Slogwang struct qnode *next = tail->next;
145a9643ea8Slogwang
146a9643ea8Slogwang /* we remove from the tail */
147a9643ea8Slogwang if (tail == p->stub) {
148a9643ea8Slogwang if (next == NULL)
149a9643ea8Slogwang return NULL;
150a9643ea8Slogwang /* advance the tail */
151a9643ea8Slogwang p->tail = next;
152a9643ea8Slogwang tail = next;
153a9643ea8Slogwang next = next->next;
154a9643ea8Slogwang }
155a9643ea8Slogwang if (likely(next != NULL)) {
156a9643ea8Slogwang p->tail = next;
157a9643ea8Slogwang return tail;
158a9643ea8Slogwang }
159a9643ea8Slogwang
160a9643ea8Slogwang head = p->head;
161a9643ea8Slogwang if (tail == head)
162a9643ea8Slogwang return NULL;
163a9643ea8Slogwang
164a9643ea8Slogwang /* swing stub node */
165a9643ea8Slogwang _qnode_pool_insert(p, p->stub);
166a9643ea8Slogwang
167a9643ea8Slogwang next = tail->next;
168a9643ea8Slogwang if (next) {
169a9643ea8Slogwang p->tail = next;
170a9643ea8Slogwang return tail;
171a9643ea8Slogwang }
172a9643ea8Slogwang return NULL;
173a9643ea8Slogwang }
174a9643ea8Slogwang
175a9643ea8Slogwang
176a9643ea8Slogwang /*
177a9643ea8Slogwang * This adds a retry to the _pool_remove function
178a9643ea8Slogwang * defined above
179a9643ea8Slogwang */
1802bfe3f2eSlogwang static __rte_always_inline struct qnode *
_qnode_pool_remove(struct qnode_pool * p)181a9643ea8Slogwang _qnode_pool_remove(struct qnode_pool *p)
182a9643ea8Slogwang {
183a9643ea8Slogwang struct qnode *n;
184a9643ea8Slogwang
185a9643ea8Slogwang do {
186a9643ea8Slogwang n = _pool_remove(p);
187a9643ea8Slogwang if (likely(n != NULL))
188a9643ea8Slogwang return n;
189a9643ea8Slogwang
190a9643ea8Slogwang rte_compiler_barrier();
191a9643ea8Slogwang } while ((p->head != p->tail) &&
192a9643ea8Slogwang (p->tail != p->stub));
193a9643ea8Slogwang return NULL;
194a9643ea8Slogwang }
195a9643ea8Slogwang
196a9643ea8Slogwang /*
197a9643ea8Slogwang * Allocate a node from the pool
198a9643ea8Slogwang * If the pool is empty add mode nodes
199a9643ea8Slogwang */
2002bfe3f2eSlogwang static __rte_always_inline struct qnode *
_qnode_alloc(void)201a9643ea8Slogwang _qnode_alloc(void)
202a9643ea8Slogwang {
203a9643ea8Slogwang struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
204a9643ea8Slogwang int prealloc_size = p->pre_alloc;
205a9643ea8Slogwang struct qnode *n;
206a9643ea8Slogwang int i;
207a9643ea8Slogwang
208a9643ea8Slogwang if (likely(p->fast_alloc != NULL)) {
209a9643ea8Slogwang n = p->fast_alloc;
210a9643ea8Slogwang p->fast_alloc = NULL;
211a9643ea8Slogwang return n;
212a9643ea8Slogwang }
213a9643ea8Slogwang
214a9643ea8Slogwang n = _qnode_pool_remove(p);
215a9643ea8Slogwang
216a9643ea8Slogwang if (unlikely(n == NULL)) {
217a9643ea8Slogwang DIAG_COUNT_INC(p, prealloc);
218a9643ea8Slogwang for (i = 0; i < prealloc_size; i++) {
219a9643ea8Slogwang n = rte_malloc_socket(NULL,
220a9643ea8Slogwang sizeof(struct qnode),
221a9643ea8Slogwang RTE_CACHE_LINE_SIZE,
222a9643ea8Slogwang rte_socket_id());
223a9643ea8Slogwang if (n == NULL)
224a9643ea8Slogwang return NULL;
225a9643ea8Slogwang
226a9643ea8Slogwang DIAG_COUNT_INC(p, available);
227a9643ea8Slogwang DIAG_COUNT_INC(p, capacity);
228a9643ea8Slogwang
229a9643ea8Slogwang n->pool = p;
230a9643ea8Slogwang _qnode_pool_insert(p, n);
231a9643ea8Slogwang }
232a9643ea8Slogwang n = _qnode_pool_remove(p);
233a9643ea8Slogwang }
234a9643ea8Slogwang n->pool = p;
235a9643ea8Slogwang DIAG_COUNT_INC(p, rd);
236a9643ea8Slogwang DIAG_COUNT_DEC(p, available);
237a9643ea8Slogwang return n;
238a9643ea8Slogwang }
239a9643ea8Slogwang
240a9643ea8Slogwang
241a9643ea8Slogwang
242a9643ea8Slogwang /*
243a9643ea8Slogwang * free a queue node to the per scheduler pool from which it came
244a9643ea8Slogwang */
2452bfe3f2eSlogwang static __rte_always_inline void
_qnode_free(struct qnode * n)246a9643ea8Slogwang _qnode_free(struct qnode *n)
247a9643ea8Slogwang {
248a9643ea8Slogwang struct qnode_pool *p = n->pool;
249a9643ea8Slogwang
250a9643ea8Slogwang
251a9643ea8Slogwang if (unlikely(p->fast_alloc != NULL) ||
252a9643ea8Slogwang unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
253a9643ea8Slogwang DIAG_COUNT_INC(p, wr);
254a9643ea8Slogwang DIAG_COUNT_INC(p, available);
255a9643ea8Slogwang _qnode_pool_insert(p, n);
256a9643ea8Slogwang return;
257a9643ea8Slogwang }
258a9643ea8Slogwang p->fast_alloc = n;
259a9643ea8Slogwang }
260a9643ea8Slogwang
261a9643ea8Slogwang /*
262a9643ea8Slogwang * Destroy an qnode pool
263a9643ea8Slogwang * queue must be empty when this is called
264a9643ea8Slogwang */
265a9643ea8Slogwang static inline int
_qnode_pool_destroy(struct qnode_pool * p)266a9643ea8Slogwang _qnode_pool_destroy(struct qnode_pool *p)
267a9643ea8Slogwang {
268a9643ea8Slogwang rte_free(p->stub);
269a9643ea8Slogwang rte_free(p);
270a9643ea8Slogwang return 0;
271a9643ea8Slogwang }
272a9643ea8Slogwang
2732bfe3f2eSlogwang #ifdef __cplusplus
2742bfe3f2eSlogwang }
2752bfe3f2eSlogwang #endif
276a9643ea8Slogwang
277a9643ea8Slogwang #endif /* LTHREAD_POOL_H_ */
278