1 /*- 2 * Copyright (c) 2007-2009 Kip Macy <[email protected]> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 * 28 */ 29 30 #ifndef _SYS_BUF_RING_H_ 31 #define _SYS_BUF_RING_H_ 32 33 #include <machine/cpu.h> 34 35 #if defined(INVARIANTS) && !defined(DEBUG_BUFRING) 36 #define DEBUG_BUFRING 1 37 #endif 38 39 #ifdef DEBUG_BUFRING 40 #include <sys/lock.h> 41 #include <sys/mutex.h> 42 #endif 43 44 struct buf_ring { 45 volatile uint32_t br_prod_head; 46 volatile uint32_t br_prod_tail; 47 int br_prod_size; 48 int br_prod_mask; 49 uint64_t br_drops; 50 volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE); 51 volatile uint32_t br_cons_tail; 52 int br_cons_size; 53 int br_cons_mask; 54 #ifdef DEBUG_BUFRING 55 struct mtx *br_lock; 56 #endif 57 void *br_ring[0] __aligned(CACHE_LINE_SIZE); 58 }; 59 60 /* 61 * multi-producer safe lock-free ring buffer enqueue 62 * 63 */ 64 static __inline int 65 buf_ring_enqueue(struct buf_ring *br, void *buf) 66 { 67 uint32_t prod_head, prod_next, cons_tail; 68 #ifdef DEBUG_BUFRING 69 int i; 70 for (i = br->br_cons_head; i != br->br_prod_head; 71 i = ((i + 1) & br->br_cons_mask)) 72 if(br->br_ring[i] == buf) 73 panic("buf=%p already enqueue at %d prod=%d cons=%d", 74 buf, i, br->br_prod_tail, br->br_cons_tail); 75 #endif 76 critical_enter(); 77 do { 78 prod_head = br->br_prod_head; 79 prod_next = (prod_head + 1) & br->br_prod_mask; 80 cons_tail = br->br_cons_tail; 81 82 if (prod_next == cons_tail) { 83 rmb(); 84 if (prod_head == br->br_prod_head && 85 cons_tail == br->br_cons_tail) { 86 br->br_drops++; 87 critical_exit(); 88 return (ENOBUFS); 89 } 90 continue; 91 } 92 } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); 93 #ifdef DEBUG_BUFRING 94 if (br->br_ring[prod_head] != NULL) 95 panic("dangling value in enqueue"); 96 #endif 97 br->br_ring[prod_head] = buf; 98 99 /* 100 * If there are other enqueues in progress 101 * that preceded us, we need to wait for them 102 * to complete 103 */ 104 while (br->br_prod_tail != prod_head) 105 cpu_spinwait(); 106 atomic_store_rel_int(&br->br_prod_tail, prod_next); 107 critical_exit(); 108 return (0); 109 } 110 111 /* 112 * multi-consumer safe dequeue 113 * 114 */ 115 static __inline void * 116 buf_ring_dequeue_mc(struct buf_ring *br) 117 { 118 uint32_t cons_head, cons_next; 119 void *buf; 120 121 critical_enter(); 122 do { 123 cons_head = br->br_cons_head; 124 cons_next = (cons_head + 1) & br->br_cons_mask; 125 126 if (cons_head == br->br_prod_tail) { 127 critical_exit(); 128 return (NULL); 129 } 130 } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); 131 132 buf = br->br_ring[cons_head]; 133 #ifdef DEBUG_BUFRING 134 br->br_ring[cons_head] = NULL; 135 #endif 136 /* 137 * If there are other dequeues in progress 138 * that preceded us, we need to wait for them 139 * to complete 140 */ 141 while (br->br_cons_tail != cons_head) 142 cpu_spinwait(); 143 144 atomic_store_rel_int(&br->br_cons_tail, cons_next); 145 critical_exit(); 146 147 return (buf); 148 } 149 150 /* 151 * single-consumer dequeue 152 * use where dequeue is protected by a lock 153 * e.g. a network driver's tx queue lock 154 */ 155 static __inline void * 156 buf_ring_dequeue_sc(struct buf_ring *br) 157 { 158 uint32_t cons_head, cons_next; 159 #ifdef PREFETCH_DEFINED 160 uint32_t cons_next_next; 161 #endif 162 uint32_t prod_tail; 163 void *buf; 164 165 /* 166 * This is a workaround to allow using buf_ring on ARM and ARM64. 167 * ARM64TODO: Fix buf_ring in a generic way. 168 * REMARKS: It is suspected that br_cons_head does not require 169 * load_acq operation, but this change was extensively tested 170 * and confirmed it's working. To be reviewed once again in 171 * FreeBSD-12. 172 * 173 * Preventing following situation: 174 175 * Core(0) - buf_ring_enqueue() Core(1) - buf_ring_dequeue_sc() 176 * ----------------------------------------- ---------------------------------------------- 177 * 178 * cons_head = br->br_cons_head; 179 * atomic_cmpset_acq_32(&br->br_prod_head, ...)); 180 * buf = br->br_ring[cons_head]; <see <1>> 181 * br->br_ring[prod_head] = buf; 182 * atomic_store_rel_32(&br->br_prod_tail, ...); 183 * prod_tail = br->br_prod_tail; 184 * if (cons_head == prod_tail) 185 * return (NULL); 186 * <condition is false and code uses invalid(old) buf>` 187 * 188 * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU. 189 */ 190 #if defined(__arm__) || defined(__aarch64__) 191 cons_head = atomic_load_acq_32(&br->br_cons_head); 192 #else 193 cons_head = br->br_cons_head; 194 #endif 195 prod_tail = atomic_load_acq_32(&br->br_prod_tail); 196 197 cons_next = (cons_head + 1) & br->br_cons_mask; 198 #ifdef PREFETCH_DEFINED 199 cons_next_next = (cons_head + 2) & br->br_cons_mask; 200 #endif 201 202 if (cons_head == prod_tail) 203 return (NULL); 204 205 #ifdef PREFETCH_DEFINED 206 if (cons_next != prod_tail) { 207 prefetch(br->br_ring[cons_next]); 208 if (cons_next_next != prod_tail) 209 prefetch(br->br_ring[cons_next_next]); 210 } 211 #endif 212 br->br_cons_head = cons_next; 213 buf = br->br_ring[cons_head]; 214 215 #ifdef DEBUG_BUFRING 216 br->br_ring[cons_head] = NULL; 217 if (!mtx_owned(br->br_lock)) 218 panic("lock not held on single consumer dequeue"); 219 if (br->br_cons_tail != cons_head) 220 panic("inconsistent list cons_tail=%d cons_head=%d", 221 br->br_cons_tail, cons_head); 222 #endif 223 br->br_cons_tail = cons_next; 224 return (buf); 225 } 226 227 /* 228 * single-consumer advance after a peek 229 * use where it is protected by a lock 230 * e.g. a network driver's tx queue lock 231 */ 232 static __inline void 233 buf_ring_advance_sc(struct buf_ring *br) 234 { 235 uint32_t cons_head, cons_next; 236 uint32_t prod_tail; 237 238 cons_head = br->br_cons_head; 239 prod_tail = br->br_prod_tail; 240 241 cons_next = (cons_head + 1) & br->br_cons_mask; 242 if (cons_head == prod_tail) 243 return; 244 br->br_cons_head = cons_next; 245 #ifdef DEBUG_BUFRING 246 br->br_ring[cons_head] = NULL; 247 #endif 248 br->br_cons_tail = cons_next; 249 } 250 251 /* 252 * Used to return a buffer (most likely already there) 253 * to the top od the ring. The caller should *not* 254 * have used any dequeue to pull it out of the ring 255 * but instead should have used the peek() function. 256 * This is normally used where the transmit queue 257 * of a driver is full, and an mubf must be returned. 258 * Most likely whats in the ring-buffer is what 259 * is being put back (since it was not removed), but 260 * sometimes the lower transmit function may have 261 * done a pullup or other function that will have 262 * changed it. As an optimzation we always put it 263 * back (since jhb says the store is probably cheaper), 264 * if we have to do a multi-queue version we will need 265 * the compare and an atomic. 266 */ 267 static __inline void 268 buf_ring_putback_sc(struct buf_ring *br, void *new) 269 { 270 KASSERT(br->br_cons_head != br->br_prod_tail, 271 ("Buf-Ring has none in putback")) ; 272 br->br_ring[br->br_cons_head] = new; 273 } 274 275 /* 276 * return a pointer to the first entry in the ring 277 * without modifying it, or NULL if the ring is empty 278 * race-prone if not protected by a lock 279 */ 280 static __inline void * 281 buf_ring_peek(struct buf_ring *br) 282 { 283 284 #ifdef DEBUG_BUFRING 285 if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) 286 panic("lock not held on single consumer dequeue"); 287 #endif 288 /* 289 * I believe it is safe to not have a memory barrier 290 * here because we control cons and tail is worst case 291 * a lagging indicator so we worst case we might 292 * return NULL immediately after a buffer has been enqueued 293 */ 294 if (br->br_cons_head == br->br_prod_tail) 295 return (NULL); 296 297 return (br->br_ring[br->br_cons_head]); 298 } 299 300 static __inline void * 301 buf_ring_peek_clear_sc(struct buf_ring *br) 302 { 303 #ifdef DEBUG_BUFRING 304 void *ret; 305 306 if (!mtx_owned(br->br_lock)) 307 panic("lock not held on single consumer dequeue"); 308 #endif 309 /* 310 * I believe it is safe to not have a memory barrier 311 * here because we control cons and tail is worst case 312 * a lagging indicator so we worst case we might 313 * return NULL immediately after a buffer has been enqueued 314 */ 315 if (br->br_cons_head == br->br_prod_tail) 316 return (NULL); 317 318 #ifdef DEBUG_BUFRING 319 /* 320 * Single consumer, i.e. cons_head will not move while we are 321 * running, so atomic_swap_ptr() is not necessary here. 322 */ 323 ret = br->br_ring[br->br_cons_head]; 324 br->br_ring[br->br_cons_head] = NULL; 325 return (ret); 326 #else 327 return (br->br_ring[br->br_cons_head]); 328 #endif 329 } 330 331 static __inline int 332 buf_ring_full(struct buf_ring *br) 333 { 334 335 return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); 336 } 337 338 static __inline int 339 buf_ring_empty(struct buf_ring *br) 340 { 341 342 return (br->br_cons_head == br->br_prod_tail); 343 } 344 345 static __inline int 346 buf_ring_count(struct buf_ring *br) 347 { 348 349 return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) 350 & br->br_prod_mask); 351 } 352 353 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags, 354 struct mtx *); 355 void buf_ring_free(struct buf_ring *br, struct malloc_type *type); 356 357 358 359 #endif 360