1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * Copyright 2015 Intel Corporation. 4 * Copyright 2010-2011 Dmitry Vyukov 5 */ 6 7 #ifndef LTHREAD_QUEUE_H_ 8 #define LTHREAD_QUEUE_H_ 9 10 #ifdef __cplusplus 11 extern "C" { 12 #endif 13 14 #include <string.h> 15 16 #include <rte_prefetch.h> 17 #include <rte_per_lcore.h> 18 19 #include "lthread_int.h" 20 #include "lthread.h" 21 #include "lthread_diag.h" 22 #include "lthread_pool.h" 23 24 struct lthread_queue; 25 26 /* 27 * This file implements an unbounded FIFO queue based on a lock free 28 * linked list. 29 * 30 * The queue is non-intrusive in that it uses intermediate nodes, and does 31 * not require these nodes to be inserted into the object being placed 32 * in the queue. 33 * 34 * This is slightly more efficient than the very similar queue in lthread_pool 35 * in that it does not have to swing a stub node as the queue becomes empty. 36 * 37 * The queue access functions allocate and free intermediate node 38 * transparently from/to a per scheduler pool ( see lthread_pool.h ). 39 * 40 * The queue provides both MPSC and SPSC insert methods 41 */ 42 43 /* 44 * define a queue of lthread nodes 45 */ 46 struct lthread_queue { 47 struct qnode *head; 48 struct qnode *tail __rte_cache_aligned; 49 struct lthread_queue *p; 50 char name[LT_MAX_NAME_SIZE]; 51 52 DIAG_COUNT_DEFINE(rd); 53 DIAG_COUNT_DEFINE(wr); 54 DIAG_COUNT_DEFINE(size); 55 56 } __rte_cache_aligned; 57 58 59 60 static inline struct lthread_queue * 61 _lthread_queue_create(const char *name) 62 { 63 struct qnode *stub; 64 struct lthread_queue *new_queue; 65 66 new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue), 67 RTE_CACHE_LINE_SIZE, 68 rte_socket_id()); 69 if (new_queue == NULL) 70 return NULL; 71 72 /* allocated stub node */ 73 stub = _qnode_alloc(); 74 RTE_ASSERT(stub); 75 76 if (name != NULL) 77 strncpy(new_queue->name, name, sizeof(new_queue->name)); 78 new_queue->name[sizeof(new_queue->name)-1] = 0; 79 80 /* initialize queue as empty */ 81 stub->next = NULL; 82 new_queue->head = stub; 83 new_queue->tail = stub; 84 85 DIAG_COUNT_INIT(new_queue, rd); 86 DIAG_COUNT_INIT(new_queue, wr); 87 DIAG_COUNT_INIT(new_queue, size); 88 89 return new_queue; 90 } 91 92 /** 93 * Return true if the queue is empty 94 */ 95 static __rte_always_inline int 96 _lthread_queue_empty(struct lthread_queue *q) 97 { 98 return q->tail == q->head; 99 } 100 101 102 103 /** 104 * Destroy a queue 105 * fail if queue is not empty 106 */ 107 static inline int _lthread_queue_destroy(struct lthread_queue *q) 108 { 109 if (q == NULL) 110 return -1; 111 112 if (!_lthread_queue_empty(q)) 113 return -1; 114 115 _qnode_free(q->head); 116 rte_free(q); 117 return 0; 118 } 119 120 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched); 121 122 /* 123 * Insert a node into a queue 124 * this implementation is multi producer safe 125 */ 126 static __rte_always_inline struct qnode * 127 _lthread_queue_insert_mp(struct lthread_queue 128 *q, void *data) 129 { 130 struct qnode *prev; 131 struct qnode *n = _qnode_alloc(); 132 133 if (n == NULL) 134 return NULL; 135 136 /* set object in node */ 137 n->data = data; 138 n->next = NULL; 139 140 /* this is an MPSC method, perform a locked update */ 141 prev = n; 142 prev = 143 (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head, 144 (uint64_t) prev); 145 /* there is a window of inconsistency until prev next is set, 146 * which is why remove must retry 147 */ 148 prev->next = n; 149 150 DIAG_COUNT_INC(q, wr); 151 DIAG_COUNT_INC(q, size); 152 153 return n; 154 } 155 156 /* 157 * Insert an node into a queue in single producer mode 158 * this implementation is NOT mult producer safe 159 */ 160 static __rte_always_inline struct qnode * 161 _lthread_queue_insert_sp(struct lthread_queue 162 *q, void *data) 163 { 164 /* allocate a queue node */ 165 struct qnode *prev; 166 struct qnode *n = _qnode_alloc(); 167 168 if (n == NULL) 169 return NULL; 170 171 /* set data in node */ 172 n->data = data; 173 n->next = NULL; 174 175 /* this is an SPSC method, no need for locked exchange operation */ 176 prev = q->head; 177 prev->next = q->head = n; 178 179 DIAG_COUNT_INC(q, wr); 180 DIAG_COUNT_INC(q, size); 181 182 return n; 183 } 184 185 /* 186 * Remove a node from a queue 187 */ 188 static __rte_always_inline void * 189 _lthread_queue_poll(struct lthread_queue *q) 190 { 191 void *data = NULL; 192 struct qnode *tail = q->tail; 193 struct qnode *next = (struct qnode *)tail->next; 194 /* 195 * There is a small window of inconsistency between producer and 196 * consumer whereby the queue may appear empty if consumer and 197 * producer access it at the same time. 198 * The consumer must handle this by retrying 199 */ 200 201 if (likely(next != NULL)) { 202 q->tail = next; 203 tail->data = next->data; 204 data = tail->data; 205 206 /* free the node */ 207 _qnode_free(tail); 208 209 DIAG_COUNT_INC(q, rd); 210 DIAG_COUNT_DEC(q, size); 211 return data; 212 } 213 return NULL; 214 } 215 216 /* 217 * Remove a node from a queue 218 */ 219 static __rte_always_inline void * 220 _lthread_queue_remove(struct lthread_queue *q) 221 { 222 void *data = NULL; 223 224 /* 225 * There is a small window of inconsistency between producer and 226 * consumer whereby the queue may appear empty if consumer and 227 * producer access it at the same time. We handle this by retrying 228 */ 229 do { 230 data = _lthread_queue_poll(q); 231 232 if (likely(data != NULL)) { 233 234 DIAG_COUNT_INC(q, rd); 235 DIAG_COUNT_DEC(q, size); 236 return data; 237 } 238 rte_compiler_barrier(); 239 } while (unlikely(!_lthread_queue_empty(q))); 240 return NULL; 241 } 242 243 #ifdef __cplusplus 244 } 245 #endif 246 247 #endif /* LTHREAD_QUEUE_H_ */ 248