1 /* 2 *- 3 * BSD LICENSE 4 * 5 * Copyright(c) 2015 Intel Corporation. All rights reserved. 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 /* 36 * Some portions of this software is derived from the producer 37 * consumer queues described by Dmitry Vyukov and published here 38 * http://www.1024cores.net 39 * 40 * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. 41 * Redistribution and use in source and binary forms, with or without 42 * modification, are permitted provided that the following conditions 43 * are met: 44 * 45 * 1. Redistributions of source code must retain the above copyright notice, 46 * this list of conditions and the following disclaimer. 47 * 48 * 2. Redistributions in binary form must reproduce the above copyright notice, 49 * this list of conditions and the following disclaimer in the documentation 50 * and/or other materials provided with the distribution. 51 * 52 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" 53 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 54 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 55 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS 56 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, 57 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 58 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 59 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 60 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 61 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 62 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 63 * 64 * The views and conclusions contained in the software and documentation are 65 * those of the authors and should not be interpreted as representing official 66 * policies, either expressed or implied, of Dmitry Vyukov. 67 */ 68 69 #ifndef LTHREAD_QUEUE_H_ 70 #define LTHREAD_QUEUE_H_ 71 72 #ifdef __cplusplus 73 extern "C" { 74 #endif 75 76 #include <string.h> 77 78 #include <rte_prefetch.h> 79 #include <rte_per_lcore.h> 80 81 #include "lthread_int.h" 82 #include "lthread.h" 83 #include "lthread_diag.h" 84 #include "lthread_pool.h" 85 86 struct lthread_queue; 87 88 /* 89 * This file implements an unbounded FIFO queue based on a lock free 90 * linked list. 91 * 92 * The queue is non-intrusive in that it uses intermediate nodes, and does 93 * not require these nodes to be inserted into the object being placed 94 * in the queue. 95 * 96 * This is slightly more efficient than the very similar queue in lthread_pool 97 * in that it does not have to swing a stub node as the queue becomes empty. 98 * 99 * The queue access functions allocate and free intermediate node 100 * transparently from/to a per scheduler pool ( see lthread_pool.h ). 101 * 102 * The queue provides both MPSC and SPSC insert methods 103 */ 104 105 /* 106 * define a queue of lthread nodes 107 */ 108 struct lthread_queue { 109 struct qnode *head; 110 struct qnode *tail __rte_cache_aligned; 111 struct lthread_queue *p; 112 char name[LT_MAX_NAME_SIZE]; 113 114 DIAG_COUNT_DEFINE(rd); 115 DIAG_COUNT_DEFINE(wr); 116 DIAG_COUNT_DEFINE(size); 117 118 } __rte_cache_aligned; 119 120 121 122 static inline struct lthread_queue * 123 _lthread_queue_create(const char *name) 124 { 125 struct qnode *stub; 126 struct lthread_queue *new_queue; 127 128 new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue), 129 RTE_CACHE_LINE_SIZE, 130 rte_socket_id()); 131 if (new_queue == NULL) 132 return NULL; 133 134 /* allocated stub node */ 135 stub = _qnode_alloc(); 136 RTE_ASSERT(stub); 137 138 if (name != NULL) 139 strncpy(new_queue->name, name, sizeof(new_queue->name)); 140 new_queue->name[sizeof(new_queue->name)-1] = 0; 141 142 /* initialize queue as empty */ 143 stub->next = NULL; 144 new_queue->head = stub; 145 new_queue->tail = stub; 146 147 DIAG_COUNT_INIT(new_queue, rd); 148 DIAG_COUNT_INIT(new_queue, wr); 149 DIAG_COUNT_INIT(new_queue, size); 150 151 return new_queue; 152 } 153 154 /** 155 * Return true if the queue is empty 156 */ 157 static __rte_always_inline int 158 _lthread_queue_empty(struct lthread_queue *q) 159 { 160 return q->tail == q->head; 161 } 162 163 164 165 /** 166 * Destroy a queue 167 * fail if queue is not empty 168 */ 169 static inline int _lthread_queue_destroy(struct lthread_queue *q) 170 { 171 if (q == NULL) 172 return -1; 173 174 if (!_lthread_queue_empty(q)) 175 return -1; 176 177 _qnode_free(q->head); 178 rte_free(q); 179 return 0; 180 } 181 182 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched); 183 184 /* 185 * Insert a node into a queue 186 * this implementation is multi producer safe 187 */ 188 static __rte_always_inline struct qnode * 189 _lthread_queue_insert_mp(struct lthread_queue 190 *q, void *data) 191 { 192 struct qnode *prev; 193 struct qnode *n = _qnode_alloc(); 194 195 if (n == NULL) 196 return NULL; 197 198 /* set object in node */ 199 n->data = data; 200 n->next = NULL; 201 202 /* this is an MPSC method, perform a locked update */ 203 prev = n; 204 prev = 205 (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head, 206 (uint64_t) prev); 207 /* there is a window of inconsistency until prev next is set, 208 * which is why remove must retry 209 */ 210 prev->next = n; 211 212 DIAG_COUNT_INC(q, wr); 213 DIAG_COUNT_INC(q, size); 214 215 return n; 216 } 217 218 /* 219 * Insert an node into a queue in single producer mode 220 * this implementation is NOT mult producer safe 221 */ 222 static __rte_always_inline struct qnode * 223 _lthread_queue_insert_sp(struct lthread_queue 224 *q, void *data) 225 { 226 /* allocate a queue node */ 227 struct qnode *prev; 228 struct qnode *n = _qnode_alloc(); 229 230 if (n == NULL) 231 return NULL; 232 233 /* set data in node */ 234 n->data = data; 235 n->next = NULL; 236 237 /* this is an SPSC method, no need for locked exchange operation */ 238 prev = q->head; 239 prev->next = q->head = n; 240 241 DIAG_COUNT_INC(q, wr); 242 DIAG_COUNT_INC(q, size); 243 244 return n; 245 } 246 247 /* 248 * Remove a node from a queue 249 */ 250 static __rte_always_inline void * 251 _lthread_queue_poll(struct lthread_queue *q) 252 { 253 void *data = NULL; 254 struct qnode *tail = q->tail; 255 struct qnode *next = (struct qnode *)tail->next; 256 /* 257 * There is a small window of inconsistency between producer and 258 * consumer whereby the queue may appear empty if consumer and 259 * producer access it at the same time. 260 * The consumer must handle this by retrying 261 */ 262 263 if (likely(next != NULL)) { 264 q->tail = next; 265 tail->data = next->data; 266 data = tail->data; 267 268 /* free the node */ 269 _qnode_free(tail); 270 271 DIAG_COUNT_INC(q, rd); 272 DIAG_COUNT_DEC(q, size); 273 return data; 274 } 275 return NULL; 276 } 277 278 /* 279 * Remove a node from a queue 280 */ 281 static __rte_always_inline void * 282 _lthread_queue_remove(struct lthread_queue *q) 283 { 284 void *data = NULL; 285 286 /* 287 * There is a small window of inconsistency between producer and 288 * consumer whereby the queue may appear empty if consumer and 289 * producer access it at the same time. We handle this by retrying 290 */ 291 do { 292 data = _lthread_queue_poll(q); 293 294 if (likely(data != NULL)) { 295 296 DIAG_COUNT_INC(q, rd); 297 DIAG_COUNT_DEC(q, size); 298 return data; 299 } 300 rte_compiler_barrier(); 301 } while (unlikely(!_lthread_queue_empty(q))); 302 return NULL; 303 } 304 305 #ifdef __cplusplus 306 } 307 #endif 308 309 #endif /* LTHREAD_QUEUE_H_ */ 310