1a9643ea8Slogwang /* 2a9643ea8Slogwang *- 3a9643ea8Slogwang * BSD LICENSE 4a9643ea8Slogwang * 5a9643ea8Slogwang * Copyright(c) 2015 Intel Corporation. All rights reserved. 6a9643ea8Slogwang * All rights reserved. 7a9643ea8Slogwang * 8a9643ea8Slogwang * Redistribution and use in source and binary forms, with or without 9a9643ea8Slogwang * modification, are permitted provided that the following conditions 10a9643ea8Slogwang * are met: 11a9643ea8Slogwang * 12a9643ea8Slogwang * * Redistributions of source code must retain the above copyright 13a9643ea8Slogwang * notice, this list of conditions and the following disclaimer. 14a9643ea8Slogwang * * Redistributions in binary form must reproduce the above copyright 15a9643ea8Slogwang * notice, this list of conditions and the following disclaimer in 16a9643ea8Slogwang * the documentation and/or other materials provided with the 17a9643ea8Slogwang * distribution. 18a9643ea8Slogwang * * Neither the name of Intel Corporation nor the names of its 19a9643ea8Slogwang * contributors may be used to endorse or promote products derived 20a9643ea8Slogwang * from this software without specific prior written permission. 21a9643ea8Slogwang * 22a9643ea8Slogwang * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23a9643ea8Slogwang * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24a9643ea8Slogwang * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25a9643ea8Slogwang * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26a9643ea8Slogwang * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27a9643ea8Slogwang * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28a9643ea8Slogwang * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29a9643ea8Slogwang * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30a9643ea8Slogwang * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31a9643ea8Slogwang * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32a9643ea8Slogwang * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33a9643ea8Slogwang */ 34a9643ea8Slogwang 35a9643ea8Slogwang /* 36a9643ea8Slogwang * Some portions of this software is derived from the producer 37a9643ea8Slogwang * consumer queues described by Dmitry Vyukov and published here 38a9643ea8Slogwang * http://www.1024cores.net 39a9643ea8Slogwang * 40a9643ea8Slogwang * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. 41a9643ea8Slogwang * Redistribution and use in source and binary forms, with or without 42a9643ea8Slogwang * modification, are permitted provided that the following conditions 43a9643ea8Slogwang * are met: 44a9643ea8Slogwang * 45a9643ea8Slogwang * 1. Redistributions of source code must retain the above copyright notice, 46a9643ea8Slogwang * this list of conditions and the following disclaimer. 47a9643ea8Slogwang * 48a9643ea8Slogwang * 2. Redistributions in binary form must reproduce the above copyright notice, 49a9643ea8Slogwang * this list of conditions and the following disclaimer in the documentation 50a9643ea8Slogwang * and/or other materials provided with the distribution. 51a9643ea8Slogwang * 52a9643ea8Slogwang * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" 53a9643ea8Slogwang * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 54a9643ea8Slogwang * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 55a9643ea8Slogwang * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS 56a9643ea8Slogwang * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, 57a9643ea8Slogwang * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 58a9643ea8Slogwang * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 59a9643ea8Slogwang * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 60a9643ea8Slogwang * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 61a9643ea8Slogwang * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 62a9643ea8Slogwang * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 63a9643ea8Slogwang * 64a9643ea8Slogwang * The views and conclusions contained in the software and documentation are 65a9643ea8Slogwang * those of the authors and should not be interpreted as representing official 66a9643ea8Slogwang * policies, either expressed or implied, of Dmitry Vyukov. 67a9643ea8Slogwang */ 68a9643ea8Slogwang 69a9643ea8Slogwang #ifndef LTHREAD_QUEUE_H_ 70a9643ea8Slogwang #define LTHREAD_QUEUE_H_ 71a9643ea8Slogwang 72*2bfe3f2eSlogwang #ifdef __cplusplus 73*2bfe3f2eSlogwang extern "C" { 74*2bfe3f2eSlogwang #endif 75*2bfe3f2eSlogwang 76a9643ea8Slogwang #include <string.h> 77a9643ea8Slogwang 78a9643ea8Slogwang #include <rte_prefetch.h> 79a9643ea8Slogwang #include <rte_per_lcore.h> 80a9643ea8Slogwang 81a9643ea8Slogwang #include "lthread_int.h" 82a9643ea8Slogwang #include "lthread.h" 83a9643ea8Slogwang #include "lthread_diag.h" 84a9643ea8Slogwang #include "lthread_pool.h" 85a9643ea8Slogwang 86a9643ea8Slogwang struct lthread_queue; 87a9643ea8Slogwang 88a9643ea8Slogwang /* 89a9643ea8Slogwang * This file implements an unbounded FIFO queue based on a lock free 90a9643ea8Slogwang * linked list. 91a9643ea8Slogwang * 92a9643ea8Slogwang * The queue is non-intrusive in that it uses intermediate nodes, and does 93a9643ea8Slogwang * not require these nodes to be inserted into the object being placed 94a9643ea8Slogwang * in the queue. 95a9643ea8Slogwang * 96a9643ea8Slogwang * This is slightly more efficient than the very similar queue in lthread_pool 97a9643ea8Slogwang * in that it does not have to swing a stub node as the queue becomes empty. 98a9643ea8Slogwang * 99a9643ea8Slogwang * The queue access functions allocate and free intermediate node 100a9643ea8Slogwang * transparently from/to a per scheduler pool ( see lthread_pool.h ). 101a9643ea8Slogwang * 102a9643ea8Slogwang * The queue provides both MPSC and SPSC insert methods 103a9643ea8Slogwang */ 104a9643ea8Slogwang 105a9643ea8Slogwang /* 106a9643ea8Slogwang * define a queue of lthread nodes 107a9643ea8Slogwang */ 108a9643ea8Slogwang struct lthread_queue { 109a9643ea8Slogwang struct qnode *head; 110a9643ea8Slogwang struct qnode *tail __rte_cache_aligned; 111a9643ea8Slogwang struct lthread_queue *p; 112a9643ea8Slogwang char name[LT_MAX_NAME_SIZE]; 113a9643ea8Slogwang 114a9643ea8Slogwang DIAG_COUNT_DEFINE(rd); 115a9643ea8Slogwang DIAG_COUNT_DEFINE(wr); 116a9643ea8Slogwang DIAG_COUNT_DEFINE(size); 117a9643ea8Slogwang 118a9643ea8Slogwang } __rte_cache_aligned; 119a9643ea8Slogwang 120a9643ea8Slogwang 121a9643ea8Slogwang 122a9643ea8Slogwang static inline struct lthread_queue * 123a9643ea8Slogwang _lthread_queue_create(const char *name) 124a9643ea8Slogwang { 125a9643ea8Slogwang struct qnode *stub; 126a9643ea8Slogwang struct lthread_queue *new_queue; 127a9643ea8Slogwang 128a9643ea8Slogwang new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue), 129a9643ea8Slogwang RTE_CACHE_LINE_SIZE, 130a9643ea8Slogwang rte_socket_id()); 131a9643ea8Slogwang if (new_queue == NULL) 132a9643ea8Slogwang return NULL; 133a9643ea8Slogwang 134a9643ea8Slogwang /* allocated stub node */ 135a9643ea8Slogwang stub = _qnode_alloc(); 136a9643ea8Slogwang RTE_ASSERT(stub); 137a9643ea8Slogwang 138a9643ea8Slogwang if (name != NULL) 139a9643ea8Slogwang strncpy(new_queue->name, name, sizeof(new_queue->name)); 140a9643ea8Slogwang new_queue->name[sizeof(new_queue->name)-1] = 0; 141a9643ea8Slogwang 142a9643ea8Slogwang /* initialize queue as empty */ 143a9643ea8Slogwang stub->next = NULL; 144a9643ea8Slogwang new_queue->head = stub; 145a9643ea8Slogwang new_queue->tail = stub; 146a9643ea8Slogwang 147a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, rd); 148a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, wr); 149a9643ea8Slogwang DIAG_COUNT_INIT(new_queue, size); 150a9643ea8Slogwang 151a9643ea8Slogwang return new_queue; 152a9643ea8Slogwang } 153a9643ea8Slogwang 154a9643ea8Slogwang /** 155a9643ea8Slogwang * Return true if the queue is empty 156a9643ea8Slogwang */ 157*2bfe3f2eSlogwang static __rte_always_inline int 158a9643ea8Slogwang _lthread_queue_empty(struct lthread_queue *q) 159a9643ea8Slogwang { 160a9643ea8Slogwang return q->tail == q->head; 161a9643ea8Slogwang } 162a9643ea8Slogwang 163a9643ea8Slogwang 164a9643ea8Slogwang 165a9643ea8Slogwang /** 166a9643ea8Slogwang * Destroy a queue 167a9643ea8Slogwang * fail if queue is not empty 168a9643ea8Slogwang */ 169a9643ea8Slogwang static inline int _lthread_queue_destroy(struct lthread_queue *q) 170a9643ea8Slogwang { 171a9643ea8Slogwang if (q == NULL) 172a9643ea8Slogwang return -1; 173a9643ea8Slogwang 174a9643ea8Slogwang if (!_lthread_queue_empty(q)) 175a9643ea8Slogwang return -1; 176a9643ea8Slogwang 177a9643ea8Slogwang _qnode_free(q->head); 178a9643ea8Slogwang rte_free(q); 179a9643ea8Slogwang return 0; 180a9643ea8Slogwang } 181a9643ea8Slogwang 182a9643ea8Slogwang RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched); 183a9643ea8Slogwang 184a9643ea8Slogwang /* 185a9643ea8Slogwang * Insert a node into a queue 186a9643ea8Slogwang * this implementation is multi producer safe 187a9643ea8Slogwang */ 188*2bfe3f2eSlogwang static __rte_always_inline struct qnode * 189a9643ea8Slogwang _lthread_queue_insert_mp(struct lthread_queue 190a9643ea8Slogwang *q, void *data) 191a9643ea8Slogwang { 192a9643ea8Slogwang struct qnode *prev; 193a9643ea8Slogwang struct qnode *n = _qnode_alloc(); 194a9643ea8Slogwang 195a9643ea8Slogwang if (n == NULL) 196a9643ea8Slogwang return NULL; 197a9643ea8Slogwang 198a9643ea8Slogwang /* set object in node */ 199a9643ea8Slogwang n->data = data; 200a9643ea8Slogwang n->next = NULL; 201a9643ea8Slogwang 202a9643ea8Slogwang /* this is an MPSC method, perform a locked update */ 203a9643ea8Slogwang prev = n; 204a9643ea8Slogwang prev = 205a9643ea8Slogwang (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head, 206a9643ea8Slogwang (uint64_t) prev); 207a9643ea8Slogwang /* there is a window of inconsistency until prev next is set, 208a9643ea8Slogwang * which is why remove must retry 209a9643ea8Slogwang */ 210a9643ea8Slogwang prev->next = n; 211a9643ea8Slogwang 212a9643ea8Slogwang DIAG_COUNT_INC(q, wr); 213a9643ea8Slogwang DIAG_COUNT_INC(q, size); 214a9643ea8Slogwang 215a9643ea8Slogwang return n; 216a9643ea8Slogwang } 217a9643ea8Slogwang 218a9643ea8Slogwang /* 219a9643ea8Slogwang * Insert an node into a queue in single producer mode 220a9643ea8Slogwang * this implementation is NOT mult producer safe 221a9643ea8Slogwang */ 222*2bfe3f2eSlogwang static __rte_always_inline struct qnode * 223a9643ea8Slogwang _lthread_queue_insert_sp(struct lthread_queue 224a9643ea8Slogwang *q, void *data) 225a9643ea8Slogwang { 226a9643ea8Slogwang /* allocate a queue node */ 227a9643ea8Slogwang struct qnode *prev; 228a9643ea8Slogwang struct qnode *n = _qnode_alloc(); 229a9643ea8Slogwang 230a9643ea8Slogwang if (n == NULL) 231a9643ea8Slogwang return NULL; 232a9643ea8Slogwang 233a9643ea8Slogwang /* set data in node */ 234a9643ea8Slogwang n->data = data; 235a9643ea8Slogwang n->next = NULL; 236a9643ea8Slogwang 237a9643ea8Slogwang /* this is an SPSC method, no need for locked exchange operation */ 238a9643ea8Slogwang prev = q->head; 239a9643ea8Slogwang prev->next = q->head = n; 240a9643ea8Slogwang 241a9643ea8Slogwang DIAG_COUNT_INC(q, wr); 242a9643ea8Slogwang DIAG_COUNT_INC(q, size); 243a9643ea8Slogwang 244a9643ea8Slogwang return n; 245a9643ea8Slogwang } 246a9643ea8Slogwang 247a9643ea8Slogwang /* 248a9643ea8Slogwang * Remove a node from a queue 249a9643ea8Slogwang */ 250*2bfe3f2eSlogwang static __rte_always_inline void * 251a9643ea8Slogwang _lthread_queue_poll(struct lthread_queue *q) 252a9643ea8Slogwang { 253a9643ea8Slogwang void *data = NULL; 254a9643ea8Slogwang struct qnode *tail = q->tail; 255a9643ea8Slogwang struct qnode *next = (struct qnode *)tail->next; 256a9643ea8Slogwang /* 257a9643ea8Slogwang * There is a small window of inconsistency between producer and 258a9643ea8Slogwang * consumer whereby the queue may appear empty if consumer and 259a9643ea8Slogwang * producer access it at the same time. 260a9643ea8Slogwang * The consumer must handle this by retrying 261a9643ea8Slogwang */ 262a9643ea8Slogwang 263a9643ea8Slogwang if (likely(next != NULL)) { 264a9643ea8Slogwang q->tail = next; 265a9643ea8Slogwang tail->data = next->data; 266a9643ea8Slogwang data = tail->data; 267a9643ea8Slogwang 268a9643ea8Slogwang /* free the node */ 269a9643ea8Slogwang _qnode_free(tail); 270a9643ea8Slogwang 271a9643ea8Slogwang DIAG_COUNT_INC(q, rd); 272a9643ea8Slogwang DIAG_COUNT_DEC(q, size); 273a9643ea8Slogwang return data; 274a9643ea8Slogwang } 275a9643ea8Slogwang return NULL; 276a9643ea8Slogwang } 277a9643ea8Slogwang 278a9643ea8Slogwang /* 279a9643ea8Slogwang * Remove a node from a queue 280a9643ea8Slogwang */ 281*2bfe3f2eSlogwang static __rte_always_inline void * 282a9643ea8Slogwang _lthread_queue_remove(struct lthread_queue *q) 283a9643ea8Slogwang { 284a9643ea8Slogwang void *data = NULL; 285a9643ea8Slogwang 286a9643ea8Slogwang /* 287a9643ea8Slogwang * There is a small window of inconsistency between producer and 288a9643ea8Slogwang * consumer whereby the queue may appear empty if consumer and 289a9643ea8Slogwang * producer access it at the same time. We handle this by retrying 290a9643ea8Slogwang */ 291a9643ea8Slogwang do { 292a9643ea8Slogwang data = _lthread_queue_poll(q); 293a9643ea8Slogwang 294a9643ea8Slogwang if (likely(data != NULL)) { 295a9643ea8Slogwang 296a9643ea8Slogwang DIAG_COUNT_INC(q, rd); 297a9643ea8Slogwang DIAG_COUNT_DEC(q, size); 298a9643ea8Slogwang return data; 299a9643ea8Slogwang } 300a9643ea8Slogwang rte_compiler_barrier(); 301a9643ea8Slogwang } while (unlikely(!_lthread_queue_empty(q))); 302a9643ea8Slogwang return NULL; 303a9643ea8Slogwang } 304a9643ea8Slogwang 305*2bfe3f2eSlogwang #ifdef __cplusplus 306*2bfe3f2eSlogwang } 307*2bfe3f2eSlogwang #endif 308a9643ea8Slogwang 309a9643ea8Slogwang #endif /* LTHREAD_QUEUE_H_ */ 310