1*a9643ea8Slogwang /* 2*a9643ea8Slogwang *- 3*a9643ea8Slogwang * BSD LICENSE 4*a9643ea8Slogwang * 5*a9643ea8Slogwang * Copyright(c) 2015 Intel Corporation. All rights reserved. 6*a9643ea8Slogwang * All rights reserved. 7*a9643ea8Slogwang * 8*a9643ea8Slogwang * Redistribution and use in source and binary forms, with or without 9*a9643ea8Slogwang * modification, are permitted provided that the following conditions 10*a9643ea8Slogwang * are met: 11*a9643ea8Slogwang * 12*a9643ea8Slogwang * * Redistributions of source code must retain the above copyright 13*a9643ea8Slogwang * notice, this list of conditions and the following disclaimer. 14*a9643ea8Slogwang * * Redistributions in binary form must reproduce the above copyright 15*a9643ea8Slogwang * notice, this list of conditions and the following disclaimer in 16*a9643ea8Slogwang * the documentation and/or other materials provided with the 17*a9643ea8Slogwang * distribution. 18*a9643ea8Slogwang * * Neither the name of Intel Corporation nor the names of its 19*a9643ea8Slogwang * contributors may be used to endorse or promote products derived 20*a9643ea8Slogwang * from this software without specific prior written permission. 21*a9643ea8Slogwang * 22*a9643ea8Slogwang * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23*a9643ea8Slogwang * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24*a9643ea8Slogwang * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25*a9643ea8Slogwang * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26*a9643ea8Slogwang * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27*a9643ea8Slogwang * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28*a9643ea8Slogwang * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29*a9643ea8Slogwang * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30*a9643ea8Slogwang * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31*a9643ea8Slogwang * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32*a9643ea8Slogwang * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33*a9643ea8Slogwang */ 34*a9643ea8Slogwang 35*a9643ea8Slogwang /* 36*a9643ea8Slogwang * Some portions of this software is derived from the producer 37*a9643ea8Slogwang * consumer queues described by Dmitry Vyukov and published here 38*a9643ea8Slogwang * http://www.1024cores.net 39*a9643ea8Slogwang * 40*a9643ea8Slogwang * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. 41*a9643ea8Slogwang * Redistribution and use in source and binary forms, with or without 42*a9643ea8Slogwang * modification, are permitted provided that the following conditions 43*a9643ea8Slogwang * are met: 44*a9643ea8Slogwang * 45*a9643ea8Slogwang * 1. Redistributions of source code must retain the above copyright notice, 46*a9643ea8Slogwang * this list of conditions and the following disclaimer. 47*a9643ea8Slogwang * 48*a9643ea8Slogwang * 2. Redistributions in binary form must reproduce the above copyright notice, 49*a9643ea8Slogwang * this list of conditions and the following disclaimer in the documentation 50*a9643ea8Slogwang * and/or other materials provided with the distribution. 51*a9643ea8Slogwang * 52*a9643ea8Slogwang * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" 53*a9643ea8Slogwang * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 54*a9643ea8Slogwang * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 55*a9643ea8Slogwang * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS 56*a9643ea8Slogwang * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, 57*a9643ea8Slogwang * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 58*a9643ea8Slogwang * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 59*a9643ea8Slogwang * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 60*a9643ea8Slogwang * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 61*a9643ea8Slogwang * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 62*a9643ea8Slogwang * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 63*a9643ea8Slogwang * 64*a9643ea8Slogwang * The views and conclusions contained in the software and documentation are 65*a9643ea8Slogwang * those of the authors and should not be interpreted as representing official 66*a9643ea8Slogwang * policies, either expressed or implied, of Dmitry Vyukov. 67*a9643ea8Slogwang */ 68*a9643ea8Slogwang 69*a9643ea8Slogwang #ifndef LTHREAD_QUEUE_H_ 70*a9643ea8Slogwang #define LTHREAD_QUEUE_H_ 71*a9643ea8Slogwang 72*a9643ea8Slogwang #include <string.h> 73*a9643ea8Slogwang 74*a9643ea8Slogwang #include <rte_prefetch.h> 75*a9643ea8Slogwang #include <rte_per_lcore.h> 76*a9643ea8Slogwang 77*a9643ea8Slogwang #include "lthread_int.h" 78*a9643ea8Slogwang #include "lthread.h" 79*a9643ea8Slogwang #include "lthread_diag.h" 80*a9643ea8Slogwang #include "lthread_pool.h" 81*a9643ea8Slogwang 82*a9643ea8Slogwang struct lthread_queue; 83*a9643ea8Slogwang 84*a9643ea8Slogwang /* 85*a9643ea8Slogwang * This file implements an unbounded FIFO queue based on a lock free 86*a9643ea8Slogwang * linked list. 87*a9643ea8Slogwang * 88*a9643ea8Slogwang * The queue is non-intrusive in that it uses intermediate nodes, and does 89*a9643ea8Slogwang * not require these nodes to be inserted into the object being placed 90*a9643ea8Slogwang * in the queue. 91*a9643ea8Slogwang * 92*a9643ea8Slogwang * This is slightly more efficient than the very similar queue in lthread_pool 93*a9643ea8Slogwang * in that it does not have to swing a stub node as the queue becomes empty. 94*a9643ea8Slogwang * 95*a9643ea8Slogwang * The queue access functions allocate and free intermediate node 96*a9643ea8Slogwang * transparently from/to a per scheduler pool ( see lthread_pool.h ). 97*a9643ea8Slogwang * 98*a9643ea8Slogwang * The queue provides both MPSC and SPSC insert methods 99*a9643ea8Slogwang */ 100*a9643ea8Slogwang 101*a9643ea8Slogwang /* 102*a9643ea8Slogwang * define a queue of lthread nodes 103*a9643ea8Slogwang */ 104*a9643ea8Slogwang struct lthread_queue { 105*a9643ea8Slogwang struct qnode *head; 106*a9643ea8Slogwang struct qnode *tail __rte_cache_aligned; 107*a9643ea8Slogwang struct lthread_queue *p; 108*a9643ea8Slogwang char name[LT_MAX_NAME_SIZE]; 109*a9643ea8Slogwang 110*a9643ea8Slogwang DIAG_COUNT_DEFINE(rd); 111*a9643ea8Slogwang DIAG_COUNT_DEFINE(wr); 112*a9643ea8Slogwang DIAG_COUNT_DEFINE(size); 113*a9643ea8Slogwang 114*a9643ea8Slogwang } __rte_cache_aligned; 115*a9643ea8Slogwang 116*a9643ea8Slogwang 117*a9643ea8Slogwang 118*a9643ea8Slogwang static inline struct lthread_queue * 119*a9643ea8Slogwang _lthread_queue_create(const char *name) 120*a9643ea8Slogwang { 121*a9643ea8Slogwang struct qnode *stub; 122*a9643ea8Slogwang struct lthread_queue *new_queue; 123*a9643ea8Slogwang 124*a9643ea8Slogwang new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue), 125*a9643ea8Slogwang RTE_CACHE_LINE_SIZE, 126*a9643ea8Slogwang rte_socket_id()); 127*a9643ea8Slogwang if (new_queue == NULL) 128*a9643ea8Slogwang return NULL; 129*a9643ea8Slogwang 130*a9643ea8Slogwang /* allocated stub node */ 131*a9643ea8Slogwang stub = _qnode_alloc(); 132*a9643ea8Slogwang RTE_ASSERT(stub); 133*a9643ea8Slogwang 134*a9643ea8Slogwang if (name != NULL) 135*a9643ea8Slogwang strncpy(new_queue->name, name, sizeof(new_queue->name)); 136*a9643ea8Slogwang new_queue->name[sizeof(new_queue->name)-1] = 0; 137*a9643ea8Slogwang 138*a9643ea8Slogwang /* initialize queue as empty */ 139*a9643ea8Slogwang stub->next = NULL; 140*a9643ea8Slogwang new_queue->head = stub; 141*a9643ea8Slogwang new_queue->tail = stub; 142*a9643ea8Slogwang 143*a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, rd); 144*a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, wr); 145*a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, size); 146*a9643ea8Slogwang 147*a9643ea8Slogwang return new_queue; 148*a9643ea8Slogwang } 149*a9643ea8Slogwang 150*a9643ea8Slogwang /** 151*a9643ea8Slogwang * Return true if the queue is empty 152*a9643ea8Slogwang */ 153*a9643ea8Slogwang static inline int __attribute__ ((always_inline)) 154*a9643ea8Slogwang _lthread_queue_empty(struct lthread_queue *q) 155*a9643ea8Slogwang { 156*a9643ea8Slogwang return q->tail == q->head; 157*a9643ea8Slogwang } 158*a9643ea8Slogwang 159*a9643ea8Slogwang 160*a9643ea8Slogwang 161*a9643ea8Slogwang /** 162*a9643ea8Slogwang * Destroy a queue 163*a9643ea8Slogwang * fail if queue is not empty 164*a9643ea8Slogwang */ 165*a9643ea8Slogwang static inline int _lthread_queue_destroy(struct lthread_queue *q) 166*a9643ea8Slogwang { 167*a9643ea8Slogwang if (q == NULL) 168*a9643ea8Slogwang return -1; 169*a9643ea8Slogwang 170*a9643ea8Slogwang if (!_lthread_queue_empty(q)) 171*a9643ea8Slogwang return -1; 172*a9643ea8Slogwang 173*a9643ea8Slogwang _qnode_free(q->head); 174*a9643ea8Slogwang rte_free(q); 175*a9643ea8Slogwang return 0; 176*a9643ea8Slogwang } 177*a9643ea8Slogwang 178*a9643ea8Slogwang RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched); 179*a9643ea8Slogwang 180*a9643ea8Slogwang /* 181*a9643ea8Slogwang * Insert a node into a queue 182*a9643ea8Slogwang * this implementation is multi producer safe 183*a9643ea8Slogwang */ 184*a9643ea8Slogwang static inline struct qnode *__attribute__ ((always_inline)) 185*a9643ea8Slogwang _lthread_queue_insert_mp(struct lthread_queue 186*a9643ea8Slogwang *q, void *data) 187*a9643ea8Slogwang { 188*a9643ea8Slogwang struct qnode *prev; 189*a9643ea8Slogwang struct qnode *n = _qnode_alloc(); 190*a9643ea8Slogwang 191*a9643ea8Slogwang if (n == NULL) 192*a9643ea8Slogwang return NULL; 193*a9643ea8Slogwang 194*a9643ea8Slogwang /* set object in node */ 195*a9643ea8Slogwang n->data = data; 196*a9643ea8Slogwang n->next = NULL; 197*a9643ea8Slogwang 198*a9643ea8Slogwang /* this is an MPSC method, perform a locked update */ 199*a9643ea8Slogwang prev = n; 200*a9643ea8Slogwang prev = 201*a9643ea8Slogwang (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head, 202*a9643ea8Slogwang (uint64_t) prev); 203*a9643ea8Slogwang /* there is a window of inconsistency until prev next is set, 204*a9643ea8Slogwang * which is why remove must retry 205*a9643ea8Slogwang */ 206*a9643ea8Slogwang prev->next = n; 207*a9643ea8Slogwang 208*a9643ea8Slogwang DIAG_COUNT_INC(q, wr); 209*a9643ea8Slogwang DIAG_COUNT_INC(q, size); 210*a9643ea8Slogwang 211*a9643ea8Slogwang return n; 212*a9643ea8Slogwang } 213*a9643ea8Slogwang 214*a9643ea8Slogwang /* 215*a9643ea8Slogwang * Insert an node into a queue in single producer mode 216*a9643ea8Slogwang * this implementation is NOT mult producer safe 217*a9643ea8Slogwang */ 218*a9643ea8Slogwang static inline struct qnode *__attribute__ ((always_inline)) 219*a9643ea8Slogwang _lthread_queue_insert_sp(struct lthread_queue 220*a9643ea8Slogwang *q, void *data) 221*a9643ea8Slogwang { 222*a9643ea8Slogwang /* allocate a queue node */ 223*a9643ea8Slogwang struct qnode *prev; 224*a9643ea8Slogwang struct qnode *n = _qnode_alloc(); 225*a9643ea8Slogwang 226*a9643ea8Slogwang if (n == NULL) 227*a9643ea8Slogwang return NULL; 228*a9643ea8Slogwang 229*a9643ea8Slogwang /* set data in node */ 230*a9643ea8Slogwang n->data = data; 231*a9643ea8Slogwang n->next = NULL; 232*a9643ea8Slogwang 233*a9643ea8Slogwang /* this is an SPSC method, no need for locked exchange operation */ 234*a9643ea8Slogwang prev = q->head; 235*a9643ea8Slogwang prev->next = q->head = n; 236*a9643ea8Slogwang 237*a9643ea8Slogwang DIAG_COUNT_INC(q, wr); 238*a9643ea8Slogwang DIAG_COUNT_INC(q, size); 239*a9643ea8Slogwang 240*a9643ea8Slogwang return n; 241*a9643ea8Slogwang } 242*a9643ea8Slogwang 243*a9643ea8Slogwang /* 244*a9643ea8Slogwang * Remove a node from a queue 245*a9643ea8Slogwang */ 246*a9643ea8Slogwang static inline void *__attribute__ ((always_inline)) 247*a9643ea8Slogwang _lthread_queue_poll(struct lthread_queue *q) 248*a9643ea8Slogwang { 249*a9643ea8Slogwang void *data = NULL; 250*a9643ea8Slogwang struct qnode *tail = q->tail; 251*a9643ea8Slogwang struct qnode *next = (struct qnode *)tail->next; 252*a9643ea8Slogwang /* 253*a9643ea8Slogwang * There is a small window of inconsistency between producer and 254*a9643ea8Slogwang * consumer whereby the queue may appear empty if consumer and 255*a9643ea8Slogwang * producer access it at the same time. 256*a9643ea8Slogwang * The consumer must handle this by retrying 257*a9643ea8Slogwang */ 258*a9643ea8Slogwang 259*a9643ea8Slogwang if (likely(next != NULL)) { 260*a9643ea8Slogwang q->tail = next; 261*a9643ea8Slogwang tail->data = next->data; 262*a9643ea8Slogwang data = tail->data; 263*a9643ea8Slogwang 264*a9643ea8Slogwang /* free the node */ 265*a9643ea8Slogwang _qnode_free(tail); 266*a9643ea8Slogwang 267*a9643ea8Slogwang DIAG_COUNT_INC(q, rd); 268*a9643ea8Slogwang DIAG_COUNT_DEC(q, size); 269*a9643ea8Slogwang return data; 270*a9643ea8Slogwang } 271*a9643ea8Slogwang return NULL; 272*a9643ea8Slogwang } 273*a9643ea8Slogwang 274*a9643ea8Slogwang /* 275*a9643ea8Slogwang * Remove a node from a queue 276*a9643ea8Slogwang */ 277*a9643ea8Slogwang static inline void *__attribute__ ((always_inline)) 278*a9643ea8Slogwang _lthread_queue_remove(struct lthread_queue *q) 279*a9643ea8Slogwang { 280*a9643ea8Slogwang void *data = NULL; 281*a9643ea8Slogwang 282*a9643ea8Slogwang /* 283*a9643ea8Slogwang * There is a small window of inconsistency between producer and 284*a9643ea8Slogwang * consumer whereby the queue may appear empty if consumer and 285*a9643ea8Slogwang * producer access it at the same time. We handle this by retrying 286*a9643ea8Slogwang */ 287*a9643ea8Slogwang do { 288*a9643ea8Slogwang data = _lthread_queue_poll(q); 289*a9643ea8Slogwang 290*a9643ea8Slogwang if (likely(data != NULL)) { 291*a9643ea8Slogwang 292*a9643ea8Slogwang DIAG_COUNT_INC(q, rd); 293*a9643ea8Slogwang DIAG_COUNT_DEC(q, size); 294*a9643ea8Slogwang return data; 295*a9643ea8Slogwang } 296*a9643ea8Slogwang rte_compiler_barrier(); 297*a9643ea8Slogwang } while (unlikely(!_lthread_queue_empty(q))); 298*a9643ea8Slogwang return NULL; 299*a9643ea8Slogwang } 300*a9643ea8Slogwang 301*a9643ea8Slogwang 302*a9643ea8Slogwang #endif /* LTHREAD_QUEUE_H_ */ 303