1 /* 2 *- 3 * BSD LICENSE 4 * 5 * Copyright(c) 2015 Intel Corporation. All rights reserved. 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * * Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * * Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * * Neither the name of Intel Corporation nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 */ 34 35 /* 36 * Some portions of this software is derived from the producer 37 * consumer queues described by Dmitry Vyukov and published here 38 * http://www.1024cores.net 39 * 40 * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. 41 * Redistribution and use in source and binary forms, with or without 42 * modification, are permitted provided that the following conditions 43 * are met: 44 * 45 * 1. Redistributions of source code must retain the above copyright notice, 46 * this list of conditions and the following disclaimer. 47 * 48 * 2. Redistributions in binary form must reproduce the above copyright notice, 49 * this list of conditions and the following disclaimer in the documentation 50 * and/or other materials provided with the distribution. 51 * 52 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" 53 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 54 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 55 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS 56 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, 57 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 58 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 59 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 60 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 61 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 62 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 63 * 64 * The views and conclusions contained in the software and documentation are 65 * those of the authors and should not be interpreted as representing official 66 * policies, either expressed or implied, of Dmitry Vyukov. 67 */ 68 69 #ifndef LTHREAD_QUEUE_H_ 70 #define LTHREAD_QUEUE_H_ 71 72 #include <string.h> 73 74 #include <rte_prefetch.h> 75 #include <rte_per_lcore.h> 76 77 #include "lthread_int.h" 78 #include "lthread.h" 79 #include "lthread_diag.h" 80 #include "lthread_pool.h" 81 82 struct lthread_queue; 83 84 /* 85 * This file implements an unbounded FIFO queue based on a lock free 86 * linked list. 87 * 88 * The queue is non-intrusive in that it uses intermediate nodes, and does 89 * not require these nodes to be inserted into the object being placed 90 * in the queue. 91 * 92 * This is slightly more efficient than the very similar queue in lthread_pool 93 * in that it does not have to swing a stub node as the queue becomes empty. 94 * 95 * The queue access functions allocate and free intermediate node 96 * transparently from/to a per scheduler pool ( see lthread_pool.h ). 97 * 98 * The queue provides both MPSC and SPSC insert methods 99 */ 100 101 /* 102 * define a queue of lthread nodes 103 */ 104 struct lthread_queue { 105 struct qnode *head; 106 struct qnode *tail __rte_cache_aligned; 107 struct lthread_queue *p; 108 char name[LT_MAX_NAME_SIZE]; 109 110 DIAG_COUNT_DEFINE(rd); 111 DIAG_COUNT_DEFINE(wr); 112 DIAG_COUNT_DEFINE(size); 113 114 } __rte_cache_aligned; 115 116 117 118 static inline struct lthread_queue * 119 _lthread_queue_create(const char *name) 120 { 121 struct qnode *stub; 122 struct lthread_queue *new_queue; 123 124 new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue), 125 RTE_CACHE_LINE_SIZE, 126 rte_socket_id()); 127 if (new_queue == NULL) 128 return NULL; 129 130 /* allocated stub node */ 131 stub = _qnode_alloc(); 132 RTE_ASSERT(stub); 133 134 if (name != NULL) 135 strncpy(new_queue->name, name, sizeof(new_queue->name)); 136 new_queue->name[sizeof(new_queue->name)-1] = 0; 137 138 /* initialize queue as empty */ 139 stub->next = NULL; 140 new_queue->head = stub; 141 new_queue->tail = stub; 142 143 DIAG_COUNT_INIT(new_queue, rd); 144 DIAG_COUNT_INIT(new_queue, wr); 145 DIAG_COUNT_INIT(new_queue, size); 146 147 return new_queue; 148 } 149 150 /** 151 * Return true if the queue is empty 152 */ 153 static inline int __attribute__ ((always_inline)) 154 _lthread_queue_empty(struct lthread_queue *q) 155 { 156 return q->tail == q->head; 157 } 158 159 160 161 /** 162 * Destroy a queue 163 * fail if queue is not empty 164 */ 165 static inline int _lthread_queue_destroy(struct lthread_queue *q) 166 { 167 if (q == NULL) 168 return -1; 169 170 if (!_lthread_queue_empty(q)) 171 return -1; 172 173 _qnode_free(q->head); 174 rte_free(q); 175 return 0; 176 } 177 178 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched); 179 180 /* 181 * Insert a node into a queue 182 * this implementation is multi producer safe 183 */ 184 static inline struct qnode *__attribute__ ((always_inline)) 185 _lthread_queue_insert_mp(struct lthread_queue 186 *q, void *data) 187 { 188 struct qnode *prev; 189 struct qnode *n = _qnode_alloc(); 190 191 if (n == NULL) 192 return NULL; 193 194 /* set object in node */ 195 n->data = data; 196 n->next = NULL; 197 198 /* this is an MPSC method, perform a locked update */ 199 prev = n; 200 prev = 201 (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head, 202 (uint64_t) prev); 203 /* there is a window of inconsistency until prev next is set, 204 * which is why remove must retry 205 */ 206 prev->next = n; 207 208 DIAG_COUNT_INC(q, wr); 209 DIAG_COUNT_INC(q, size); 210 211 return n; 212 } 213 214 /* 215 * Insert an node into a queue in single producer mode 216 * this implementation is NOT mult producer safe 217 */ 218 static inline struct qnode *__attribute__ ((always_inline)) 219 _lthread_queue_insert_sp(struct lthread_queue 220 *q, void *data) 221 { 222 /* allocate a queue node */ 223 struct qnode *prev; 224 struct qnode *n = _qnode_alloc(); 225 226 if (n == NULL) 227 return NULL; 228 229 /* set data in node */ 230 n->data = data; 231 n->next = NULL; 232 233 /* this is an SPSC method, no need for locked exchange operation */ 234 prev = q->head; 235 prev->next = q->head = n; 236 237 DIAG_COUNT_INC(q, wr); 238 DIAG_COUNT_INC(q, size); 239 240 return n; 241 } 242 243 /* 244 * Remove a node from a queue 245 */ 246 static inline void *__attribute__ ((always_inline)) 247 _lthread_queue_poll(struct lthread_queue *q) 248 { 249 void *data = NULL; 250 struct qnode *tail = q->tail; 251 struct qnode *next = (struct qnode *)tail->next; 252 /* 253 * There is a small window of inconsistency between producer and 254 * consumer whereby the queue may appear empty if consumer and 255 * producer access it at the same time. 256 * The consumer must handle this by retrying 257 */ 258 259 if (likely(next != NULL)) { 260 q->tail = next; 261 tail->data = next->data; 262 data = tail->data; 263 264 /* free the node */ 265 _qnode_free(tail); 266 267 DIAG_COUNT_INC(q, rd); 268 DIAG_COUNT_DEC(q, size); 269 return data; 270 } 271 return NULL; 272 } 273 274 /* 275 * Remove a node from a queue 276 */ 277 static inline void *__attribute__ ((always_inline)) 278 _lthread_queue_remove(struct lthread_queue *q) 279 { 280 void *data = NULL; 281 282 /* 283 * There is a small window of inconsistency between producer and 284 * consumer whereby the queue may appear empty if consumer and 285 * producer access it at the same time. We handle this by retrying 286 */ 287 do { 288 data = _lthread_queue_poll(q); 289 290 if (likely(data != NULL)) { 291 292 DIAG_COUNT_INC(q, rd); 293 DIAG_COUNT_DEC(q, size); 294 return data; 295 } 296 rte_compiler_barrier(); 297 } while (unlikely(!_lthread_queue_empty(q))); 298 return NULL; 299 } 300 301 302 #endif /* LTHREAD_QUEUE_H_ */ 303