1 /* 2 *- 3 * BSD LICENSE 4 * 5 * Copyright(c) 2015 Intel Corporation. All rights reserved. 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 /* 36 * Some portions of this software is derived from the producer 37 * consumer queues described by Dmitry Vyukov and published here 38 * http://www.1024cores.net 39 * 40 * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. 41 * Redistribution and use in source and binary forms, with or without 42 * modification, are permitted provided that the following conditions 43 * are met 44 * 45 * 1. Redistributions of source code must retain the above copyright notice, 46 * this list of conditions and the following disclaimer. 47 * 48 * 2. Redistributions in binary form must reproduce the above copyright notice, 49 * this list of conditions and the following disclaimer in the documentation 50 * and/or other materials provided with the distribution. 51 * 52 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" 53 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 54 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 55 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS 56 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, 57 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 58 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 59 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 60 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 61 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 62 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 63 * 64 * The views and conclusions contained in the software and documentation are 65 * those of the authors and should not be interpreted as representing official 66 * policies, either expressed or implied, of Dmitry Vyukov. 67 */ 68 69 #ifndef LTHREAD_POOL_H_ 70 #define LTHREAD_POOL_H_ 71 72 #ifdef __cplusplus 73 extern "C" { 74 #endif 75 76 #include <rte_malloc.h> 77 #include <rte_per_lcore.h> 78 #include <rte_log.h> 79 80 #include "lthread_int.h" 81 #include "lthread_diag.h" 82 83 /* 84 * This file implements pool of queue nodes used by the queue implemented 85 * in lthread_queue.h. 86 * 87 * The pool is an intrusive lock free MPSC queue. 88 * 89 * The pool is created empty and populated lazily, i.e. on first attempt to 90 * allocate a the pool. 91 * 92 * Whenever the pool is empty more nodes are added to the pool 93 * The number of nodes preallocated in this way is a parameter of 94 * _qnode_pool_create. Freeing an object returns it to the pool. 95 * 96 * Each lthread scheduler maintains its own pool of nodes. L-threads must always 97 * allocate from this local pool ( because it is a single consumer queue ). 98 * L-threads can free nodes to any pool (because it is a multi producer queue) 99 * This enables threads that have affined to a different scheduler to free 100 * nodes safely. 101 */ 102 103 struct qnode; 104 struct qnode_cache; 105 106 /* 107 * define intermediate node 108 */ 109 struct qnode { 110 struct qnode *next; 111 void *data; 112 struct qnode_pool *pool; 113 } __rte_cache_aligned; 114 115 /* 116 * a pool structure 117 */ 118 struct qnode_pool { 119 struct qnode *head; 120 struct qnode *stub; 121 struct qnode *fast_alloc; 122 struct qnode *tail __rte_cache_aligned; 123 int pre_alloc; 124 char name[LT_MAX_NAME_SIZE]; 125 126 DIAG_COUNT_DEFINE(rd); 127 DIAG_COUNT_DEFINE(wr); 128 DIAG_COUNT_DEFINE(available); 129 DIAG_COUNT_DEFINE(prealloc); 130 DIAG_COUNT_DEFINE(capacity); 131 } __rte_cache_aligned; 132 133 /* 134 * Create a pool of qnodes 135 */ 136 137 static inline struct qnode_pool * 138 _qnode_pool_create(const char *name, int prealloc_size) { 139 140 struct qnode_pool *p = rte_malloc_socket(NULL, 141 sizeof(struct qnode_pool), 142 RTE_CACHE_LINE_SIZE, 143 rte_socket_id()); 144 145 RTE_ASSERT(p); 146 147 p->stub = rte_malloc_socket(NULL, 148 sizeof(struct qnode), 149 RTE_CACHE_LINE_SIZE, 150 rte_socket_id()); 151 152 RTE_ASSERT(p->stub); 153 154 if (name != NULL) 155 strncpy(p->name, name, LT_MAX_NAME_SIZE); 156 p->name[sizeof(p->name)-1] = 0; 157 158 p->stub->pool = p; 159 p->stub->next = NULL; 160 p->tail = p->stub; 161 p->head = p->stub; 162 p->pre_alloc = prealloc_size; 163 164 DIAG_COUNT_INIT(p, rd); 165 DIAG_COUNT_INIT(p, wr); 166 DIAG_COUNT_INIT(p, available); 167 DIAG_COUNT_INIT(p, prealloc); 168 DIAG_COUNT_INIT(p, capacity); 169 170 return p; 171 } 172 173 174 /* 175 * Insert a node into the pool 176 */ 177 static __rte_always_inline void 178 _qnode_pool_insert(struct qnode_pool *p, struct qnode *n) 179 { 180 n->next = NULL; 181 struct qnode *prev = n; 182 /* We insert at the head */ 183 prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head, 184 (uint64_t) prev); 185 /* there is a window of inconsistency until prev next is set */ 186 /* which is why remove must retry */ 187 prev->next = (n); 188 } 189 190 /* 191 * Remove a node from the pool 192 * 193 * There is a race with _qnode_pool_insert() whereby the queue could appear 194 * empty during a concurrent insert, this is handled by retrying 195 * 196 * The queue uses a stub node, which must be swung as the queue becomes 197 * empty, this requires an insert of the stub, which means that removing the 198 * last item from the queue incurs the penalty of an atomic exchange. Since the 199 * pool is maintained with a bulk pre-allocation the cost of this is amortised. 200 */ 201 static __rte_always_inline struct qnode * 202 _pool_remove(struct qnode_pool *p) 203 { 204 struct qnode *head; 205 struct qnode *tail = p->tail; 206 struct qnode *next = tail->next; 207 208 /* we remove from the tail */ 209 if (tail == p->stub) { 210 if (next == NULL) 211 return NULL; 212 /* advance the tail */ 213 p->tail = next; 214 tail = next; 215 next = next->next; 216 } 217 if (likely(next != NULL)) { 218 p->tail = next; 219 return tail; 220 } 221 222 head = p->head; 223 if (tail == head) 224 return NULL; 225 226 /* swing stub node */ 227 _qnode_pool_insert(p, p->stub); 228 229 next = tail->next; 230 if (next) { 231 p->tail = next; 232 return tail; 233 } 234 return NULL; 235 } 236 237 238 /* 239 * This adds a retry to the _pool_remove function 240 * defined above 241 */ 242 static __rte_always_inline struct qnode * 243 _qnode_pool_remove(struct qnode_pool *p) 244 { 245 struct qnode *n; 246 247 do { 248 n = _pool_remove(p); 249 if (likely(n != NULL)) 250 return n; 251 252 rte_compiler_barrier(); 253 } while ((p->head != p->tail) && 254 (p->tail != p->stub)); 255 return NULL; 256 } 257 258 /* 259 * Allocate a node from the pool 260 * If the pool is empty add mode nodes 261 */ 262 static __rte_always_inline struct qnode * 263 _qnode_alloc(void) 264 { 265 struct qnode_pool *p = (THIS_SCHED)->qnode_pool; 266 int prealloc_size = p->pre_alloc; 267 struct qnode *n; 268 int i; 269 270 if (likely(p->fast_alloc != NULL)) { 271 n = p->fast_alloc; 272 p->fast_alloc = NULL; 273 return n; 274 } 275 276 n = _qnode_pool_remove(p); 277 278 if (unlikely(n == NULL)) { 279 DIAG_COUNT_INC(p, prealloc); 280 for (i = 0; i < prealloc_size; i++) { 281 n = rte_malloc_socket(NULL, 282 sizeof(struct qnode), 283 RTE_CACHE_LINE_SIZE, 284 rte_socket_id()); 285 if (n == NULL) 286 return NULL; 287 288 DIAG_COUNT_INC(p, available); 289 DIAG_COUNT_INC(p, capacity); 290 291 n->pool = p; 292 _qnode_pool_insert(p, n); 293 } 294 n = _qnode_pool_remove(p); 295 } 296 n->pool = p; 297 DIAG_COUNT_INC(p, rd); 298 DIAG_COUNT_DEC(p, available); 299 return n; 300 } 301 302 303 304 /* 305 * free a queue node to the per scheduler pool from which it came 306 */ 307 static __rte_always_inline void 308 _qnode_free(struct qnode *n) 309 { 310 struct qnode_pool *p = n->pool; 311 312 313 if (unlikely(p->fast_alloc != NULL) || 314 unlikely(n->pool != (THIS_SCHED)->qnode_pool)) { 315 DIAG_COUNT_INC(p, wr); 316 DIAG_COUNT_INC(p, available); 317 _qnode_pool_insert(p, n); 318 return; 319 } 320 p->fast_alloc = n; 321 } 322 323 /* 324 * Destroy an qnode pool 325 * queue must be empty when this is called 326 */ 327 static inline int 328 _qnode_pool_destroy(struct qnode_pool *p) 329 { 330 rte_free(p->stub); 331 rte_free(p); 332 return 0; 333 } 334 335 #ifdef __cplusplus 336 } 337 #endif 338 339 #endif /* LTHREAD_POOL_H_ */ 340