1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2015 Intel Corporation. 4 * Copyright 2010-2011 Dmitry Vyukov 5 */ 6 7 #ifndef LTHREAD_POOL_H_ 8 #define LTHREAD_POOL_H_ 9 10 #ifdef __cplusplus 11 extern "C" { 12 #endif 13 14 #include <rte_malloc.h> 15 #include <rte_per_lcore.h> 16 #include <rte_log.h> 17 18 #include "lthread_int.h" 19 #include "lthread_diag.h" 20 21 /* 22 * This file implements pool of queue nodes used by the queue implemented 23 * in lthread_queue.h. 24 * 25 * The pool is an intrusive lock free MPSC queue. 26 * 27 * The pool is created empty and populated lazily, i.e. on first attempt to 28 * allocate a the pool. 29 * 30 * Whenever the pool is empty more nodes are added to the pool 31 * The number of nodes preallocated in this way is a parameter of 32 * _qnode_pool_create. Freeing an object returns it to the pool. 33 * 34 * Each lthread scheduler maintains its own pool of nodes. L-threads must always 35 * allocate from this local pool ( because it is a single consumer queue ). 36 * L-threads can free nodes to any pool (because it is a multi producer queue) 37 * This enables threads that have affined to a different scheduler to free 38 * nodes safely. 39 */ 40 41 struct qnode; 42 struct qnode_cache; 43 44 /* 45 * define intermediate node 46 */ 47 struct qnode { 48 struct qnode *next; 49 void *data; 50 struct qnode_pool *pool; 51 } __rte_cache_aligned; 52 53 /* 54 * a pool structure 55 */ 56 struct qnode_pool { 57 struct qnode *head; 58 struct qnode *stub; 59 struct qnode *fast_alloc; 60 struct qnode *tail __rte_cache_aligned; 61 int pre_alloc; 62 char name[LT_MAX_NAME_SIZE]; 63 64 DIAG_COUNT_DEFINE(rd); 65 DIAG_COUNT_DEFINE(wr); 66 DIAG_COUNT_DEFINE(available); 67 DIAG_COUNT_DEFINE(prealloc); 68 DIAG_COUNT_DEFINE(capacity); 69 } __rte_cache_aligned; 70 71 /* 72 * Create a pool of qnodes 73 */ 74 75 static inline struct qnode_pool * 76 _qnode_pool_create(const char *name, int prealloc_size) { 77 78 struct qnode_pool *p = rte_malloc_socket(NULL, 79 sizeof(struct qnode_pool), 80 RTE_CACHE_LINE_SIZE, 81 rte_socket_id()); 82 83 RTE_ASSERT(p); 84 85 p->stub = rte_malloc_socket(NULL, 86 sizeof(struct qnode), 87 RTE_CACHE_LINE_SIZE, 88 rte_socket_id()); 89 90 RTE_ASSERT(p->stub); 91 92 if (name != NULL) 93 strncpy(p->name, name, LT_MAX_NAME_SIZE); 94 p->name[sizeof(p->name)-1] = 0; 95 96 p->stub->pool = p; 97 p->stub->next = NULL; 98 p->tail = p->stub; 99 p->head = p->stub; 100 p->pre_alloc = prealloc_size; 101 102 DIAG_COUNT_INIT(p, rd); 103 DIAG_COUNT_INIT(p, wr); 104 DIAG_COUNT_INIT(p, available); 105 DIAG_COUNT_INIT(p, prealloc); 106 DIAG_COUNT_INIT(p, capacity); 107 108 return p; 109 } 110 111 112 /* 113 * Insert a node into the pool 114 */ 115 static __rte_always_inline void 116 _qnode_pool_insert(struct qnode_pool *p, struct qnode *n) 117 { 118 n->next = NULL; 119 struct qnode *prev = n; 120 /* We insert at the head */ 121 prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head, 122 (uint64_t) prev); 123 /* there is a window of inconsistency until prev next is set */ 124 /* which is why remove must retry */ 125 prev->next = (n); 126 } 127 128 /* 129 * Remove a node from the pool 130 * 131 * There is a race with _qnode_pool_insert() whereby the queue could appear 132 * empty during a concurrent insert, this is handled by retrying 133 * 134 * The queue uses a stub node, which must be swung as the queue becomes 135 * empty, this requires an insert of the stub, which means that removing the 136 * last item from the queue incurs the penalty of an atomic exchange. Since the 137 * pool is maintained with a bulk pre-allocation the cost of this is amortised. 138 */ 139 static __rte_always_inline struct qnode * 140 _pool_remove(struct qnode_pool *p) 141 { 142 struct qnode *head; 143 struct qnode *tail = p->tail; 144 struct qnode *next = tail->next; 145 146 /* we remove from the tail */ 147 if (tail == p->stub) { 148 if (next == NULL) 149 return NULL; 150 /* advance the tail */ 151 p->tail = next; 152 tail = next; 153 next = next->next; 154 } 155 if (likely(next != NULL)) { 156 p->tail = next; 157 return tail; 158 } 159 160 head = p->head; 161 if (tail == head) 162 return NULL; 163 164 /* swing stub node */ 165 _qnode_pool_insert(p, p->stub); 166 167 next = tail->next; 168 if (next) { 169 p->tail = next; 170 return tail; 171 } 172 return NULL; 173 } 174 175 176 /* 177 * This adds a retry to the _pool_remove function 178 * defined above 179 */ 180 static __rte_always_inline struct qnode * 181 _qnode_pool_remove(struct qnode_pool *p) 182 { 183 struct qnode *n; 184 185 do { 186 n = _pool_remove(p); 187 if (likely(n != NULL)) 188 return n; 189 190 rte_compiler_barrier(); 191 } while ((p->head != p->tail) && 192 (p->tail != p->stub)); 193 return NULL; 194 } 195 196 /* 197 * Allocate a node from the pool 198 * If the pool is empty add mode nodes 199 */ 200 static __rte_always_inline struct qnode * 201 _qnode_alloc(void) 202 { 203 struct qnode_pool *p = (THIS_SCHED)->qnode_pool; 204 int prealloc_size = p->pre_alloc; 205 struct qnode *n; 206 int i; 207 208 if (likely(p->fast_alloc != NULL)) { 209 n = p->fast_alloc; 210 p->fast_alloc = NULL; 211 return n; 212 } 213 214 n = _qnode_pool_remove(p); 215 216 if (unlikely(n == NULL)) { 217 DIAG_COUNT_INC(p, prealloc); 218 for (i = 0; i < prealloc_size; i++) { 219 n = rte_malloc_socket(NULL, 220 sizeof(struct qnode), 221 RTE_CACHE_LINE_SIZE, 222 rte_socket_id()); 223 if (n == NULL) 224 return NULL; 225 226 DIAG_COUNT_INC(p, available); 227 DIAG_COUNT_INC(p, capacity); 228 229 n->pool = p; 230 _qnode_pool_insert(p, n); 231 } 232 n = _qnode_pool_remove(p); 233 } 234 n->pool = p; 235 DIAG_COUNT_INC(p, rd); 236 DIAG_COUNT_DEC(p, available); 237 return n; 238 } 239 240 241 242 /* 243 * free a queue node to the per scheduler pool from which it came 244 */ 245 static __rte_always_inline void 246 _qnode_free(struct qnode *n) 247 { 248 struct qnode_pool *p = n->pool; 249 250 251 if (unlikely(p->fast_alloc != NULL) || 252 unlikely(n->pool != (THIS_SCHED)->qnode_pool)) { 253 DIAG_COUNT_INC(p, wr); 254 DIAG_COUNT_INC(p, available); 255 _qnode_pool_insert(p, n); 256 return; 257 } 258 p->fast_alloc = n; 259 } 260 261 /* 262 * Destroy an qnode pool 263 * queue must be empty when this is called 264 */ 265 static inline int 266 _qnode_pool_destroy(struct qnode_pool *p) 267 { 268 rte_free(p->stub); 269 rte_free(p); 270 return 0; 271 } 272 273 #ifdef __cplusplus 274 } 275 #endif 276 277 #endif /* LTHREAD_POOL_H_ */ 278