1 /*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <[email protected]>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28 #include <sys/cdefs.h>
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/counter.h>
33 #include <sys/lock.h>
34 #include <sys/mutex.h>
35 #include <sys/malloc.h>
36 #include <machine/cpu.h>
37 #include <net/mp_ring.h>
38
39 union ring_state {
40 struct {
41 uint16_t pidx_head;
42 uint16_t pidx_tail;
43 uint16_t cidx;
44 uint16_t flags;
45 };
46 uint64_t state;
47 };
48
49 enum {
50 IDLE = 0, /* consumer ran to completion, nothing more to do. */
51 BUSY, /* consumer is running already, or will be shortly. */
52 STALLED, /* consumer stopped due to lack of resources. */
53 ABDICATED, /* consumer stopped even though there was work to be
54 done because it wants another thread to take over. */
55 };
56
57 static inline uint16_t
space_available(struct ifmp_ring * r,union ring_state s)58 space_available(struct ifmp_ring *r, union ring_state s)
59 {
60 uint16_t x = r->size - 1;
61
62 if (s.cidx == s.pidx_head)
63 return (x);
64 else if (s.cidx > s.pidx_head)
65 return (s.cidx - s.pidx_head - 1);
66 else
67 return (x - s.pidx_head + s.cidx);
68 }
69
70 static inline uint16_t
increment_idx(struct ifmp_ring * r,uint16_t idx,uint16_t n)71 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
72 {
73 int x = r->size - idx;
74
75 MPASS(x > 0);
76 return (x > n ? idx + n : n - x);
77 }
78
79 /* Consumer is about to update the ring's state to s */
80 static inline uint16_t
state_to_flags(union ring_state s,int abdicate)81 state_to_flags(union ring_state s, int abdicate)
82 {
83
84 if (s.cidx == s.pidx_tail)
85 return (IDLE);
86 else if (abdicate && s.pidx_tail != s.pidx_head)
87 return (ABDICATED);
88
89 return (BUSY);
90 }
91
92 #ifdef MP_RING_NO_64BIT_ATOMICS
93 static void
drain_ring_locked(struct ifmp_ring * r,union ring_state os,uint16_t prev,int budget)94 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
95 {
96 union ring_state ns;
97 int n, pending, total;
98 uint16_t cidx = os.cidx;
99 uint16_t pidx = os.pidx_tail;
100
101 MPASS(os.flags == BUSY);
102 MPASS(cidx != pidx);
103
104 if (prev == IDLE)
105 counter_u64_add(r->starts, 1);
106 pending = 0;
107 total = 0;
108
109 while (cidx != pidx) {
110 /* Items from cidx to pidx are available for consumption. */
111 n = r->drain(r, cidx, pidx);
112 if (n == 0) {
113 os.state = ns.state = r->state;
114 ns.cidx = cidx;
115 ns.flags = STALLED;
116 r->state = ns.state;
117 if (prev != STALLED)
118 counter_u64_add(r->stalls, 1);
119 else if (total > 0) {
120 counter_u64_add(r->restarts, 1);
121 counter_u64_add(r->stalls, 1);
122 }
123 break;
124 }
125 cidx = increment_idx(r, cidx, n);
126 pending += n;
127 total += n;
128
129 /*
130 * We update the cidx only if we've caught up with the pidx, the
131 * real cidx is getting too far ahead of the one visible to
132 * everyone else, or we have exceeded our budget.
133 */
134 if (cidx != pidx && pending < 64 && total < budget)
135 continue;
136
137 os.state = ns.state = r->state;
138 ns.cidx = cidx;
139 ns.flags = state_to_flags(ns, total >= budget);
140 r->state = ns.state;
141
142 if (ns.flags == ABDICATED)
143 counter_u64_add(r->abdications, 1);
144 if (ns.flags != BUSY) {
145 /* Wrong loop exit if we're going to stall. */
146 MPASS(ns.flags != STALLED);
147 if (prev == STALLED) {
148 MPASS(total > 0);
149 counter_u64_add(r->restarts, 1);
150 }
151 break;
152 }
153
154 /*
155 * The acquire style atomic above guarantees visibility of items
156 * associated with any pidx change that we notice here.
157 */
158 pidx = ns.pidx_tail;
159 pending = 0;
160 }
161 }
162 #else
163 /*
164 * Caller passes in a state, with a guarantee that there is work to do and that
165 * all items up to the pidx_tail in the state are visible.
166 */
167 static void
drain_ring_lockless(struct ifmp_ring * r,union ring_state os,uint16_t prev,int budget)168 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
169 {
170 union ring_state ns;
171 int n, pending, total;
172 uint16_t cidx = os.cidx;
173 uint16_t pidx = os.pidx_tail;
174
175 MPASS(os.flags == BUSY);
176 MPASS(cidx != pidx);
177
178 if (prev == IDLE)
179 counter_u64_add(r->starts, 1);
180 pending = 0;
181 total = 0;
182
183 while (cidx != pidx) {
184 /* Items from cidx to pidx are available for consumption. */
185 n = r->drain(r, cidx, pidx);
186 if (n == 0) {
187 critical_enter();
188 os.state = r->state;
189 do {
190 ns.state = os.state;
191 ns.cidx = cidx;
192 ns.flags = STALLED;
193 } while (atomic_fcmpset_64(&r->state, &os.state,
194 ns.state) == 0);
195 critical_exit();
196 if (prev != STALLED)
197 counter_u64_add(r->stalls, 1);
198 else if (total > 0) {
199 counter_u64_add(r->restarts, 1);
200 counter_u64_add(r->stalls, 1);
201 }
202 break;
203 }
204 cidx = increment_idx(r, cidx, n);
205 pending += n;
206 total += n;
207
208 /*
209 * We update the cidx only if we've caught up with the pidx, the
210 * real cidx is getting too far ahead of the one visible to
211 * everyone else, or we have exceeded our budget.
212 */
213 if (cidx != pidx && pending < 64 && total < budget)
214 continue;
215 critical_enter();
216 os.state = r->state;
217 do {
218 ns.state = os.state;
219 ns.cidx = cidx;
220 ns.flags = state_to_flags(ns, total >= budget);
221 } while (atomic_fcmpset_acq_64(&r->state, &os.state,
222 ns.state) == 0);
223 critical_exit();
224
225 if (ns.flags == ABDICATED)
226 counter_u64_add(r->abdications, 1);
227 if (ns.flags != BUSY) {
228 /* Wrong loop exit if we're going to stall. */
229 MPASS(ns.flags != STALLED);
230 if (prev == STALLED) {
231 MPASS(total > 0);
232 counter_u64_add(r->restarts, 1);
233 }
234 break;
235 }
236
237 /*
238 * The acquire style atomic above guarantees visibility of items
239 * associated with any pidx change that we notice here.
240 */
241 pidx = ns.pidx_tail;
242 pending = 0;
243 }
244 }
245 #endif
246
247 int
ifmp_ring_alloc(struct ifmp_ring ** pr,int size,void * cookie,mp_ring_drain_t drain,mp_ring_can_drain_t can_drain,struct malloc_type * mt,int flags)248 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
249 mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
250 {
251 struct ifmp_ring *r;
252
253 /* All idx are 16b so size can be 65536 at most */
254 if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
255 can_drain == NULL)
256 return (EINVAL);
257 *pr = NULL;
258 flags &= M_NOWAIT | M_WAITOK;
259 MPASS(flags != 0);
260
261 r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
262 if (r == NULL)
263 return (ENOMEM);
264 r->size = size;
265 r->cookie = cookie;
266 r->mt = mt;
267 r->drain = drain;
268 r->can_drain = can_drain;
269 r->enqueues = counter_u64_alloc(flags);
270 r->drops = counter_u64_alloc(flags);
271 r->starts = counter_u64_alloc(flags);
272 r->stalls = counter_u64_alloc(flags);
273 r->restarts = counter_u64_alloc(flags);
274 r->abdications = counter_u64_alloc(flags);
275 if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
276 r->stalls == NULL || r->restarts == NULL ||
277 r->abdications == NULL) {
278 ifmp_ring_free(r);
279 return (ENOMEM);
280 }
281
282 *pr = r;
283 #ifdef MP_RING_NO_64BIT_ATOMICS
284 mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
285 #endif
286 return (0);
287 }
288
289 void
ifmp_ring_free(struct ifmp_ring * r)290 ifmp_ring_free(struct ifmp_ring *r)
291 {
292
293 if (r == NULL)
294 return;
295
296 if (r->enqueues != NULL)
297 counter_u64_free(r->enqueues);
298 if (r->drops != NULL)
299 counter_u64_free(r->drops);
300 if (r->starts != NULL)
301 counter_u64_free(r->starts);
302 if (r->stalls != NULL)
303 counter_u64_free(r->stalls);
304 if (r->restarts != NULL)
305 counter_u64_free(r->restarts);
306 if (r->abdications != NULL)
307 counter_u64_free(r->abdications);
308
309 free(r, r->mt);
310 }
311
312 /*
313 * Enqueue n items and maybe drain the ring for some time.
314 *
315 * Returns an errno.
316 */
317 #ifdef MP_RING_NO_64BIT_ATOMICS
318 int
ifmp_ring_enqueue(struct ifmp_ring * r,void ** items,int n,int budget,int abdicate)319 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
320 {
321 union ring_state os, ns;
322 uint16_t pidx_start, pidx_stop;
323 int i;
324
325 MPASS(items != NULL);
326 MPASS(n > 0);
327
328 mtx_lock(&r->lock);
329 /*
330 * Reserve room for the new items. Our reservation, if successful, is
331 * from 'pidx_start' to 'pidx_stop'.
332 */
333 os.state = r->state;
334 if (n >= space_available(r, os)) {
335 counter_u64_add(r->drops, n);
336 MPASS(os.flags != IDLE);
337 mtx_unlock(&r->lock);
338 if (os.flags == STALLED)
339 ifmp_ring_check_drainage(r, 0);
340 return (ENOBUFS);
341 }
342 ns.state = os.state;
343 ns.pidx_head = increment_idx(r, os.pidx_head, n);
344 r->state = ns.state;
345 pidx_start = os.pidx_head;
346 pidx_stop = ns.pidx_head;
347
348 /*
349 * Wait for other producers who got in ahead of us to enqueue their
350 * items, one producer at a time. It is our turn when the ring's
351 * pidx_tail reaches the beginning of our reservation (pidx_start).
352 */
353 while (ns.pidx_tail != pidx_start) {
354 cpu_spinwait();
355 ns.state = r->state;
356 }
357
358 /* Now it is our turn to fill up the area we reserved earlier. */
359 i = pidx_start;
360 do {
361 r->items[i] = *items++;
362 if (__predict_false(++i == r->size))
363 i = 0;
364 } while (i != pidx_stop);
365
366 /*
367 * Update the ring's pidx_tail. The release style atomic guarantees
368 * that the items are visible to any thread that sees the updated pidx.
369 */
370 os.state = ns.state = r->state;
371 ns.pidx_tail = pidx_stop;
372 if (abdicate) {
373 if (os.flags == IDLE)
374 ns.flags = ABDICATED;
375 } else
376 ns.flags = BUSY;
377 r->state = ns.state;
378 counter_u64_add(r->enqueues, n);
379
380 if (!abdicate) {
381 /*
382 * Turn into a consumer if some other thread isn't active as a consumer
383 * already.
384 */
385 if (os.flags != BUSY)
386 drain_ring_locked(r, ns, os.flags, budget);
387 }
388
389 mtx_unlock(&r->lock);
390 return (0);
391 }
392 #else
393 int
ifmp_ring_enqueue(struct ifmp_ring * r,void ** items,int n,int budget,int abdicate)394 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
395 {
396 union ring_state os, ns;
397 uint16_t pidx_start, pidx_stop;
398 int i;
399
400 MPASS(items != NULL);
401 MPASS(n > 0);
402
403 /*
404 * Reserve room for the new items. Our reservation, if successful, is
405 * from 'pidx_start' to 'pidx_stop'.
406 */
407 os.state = r->state;
408 for (;;) {
409 if (n >= space_available(r, os)) {
410 counter_u64_add(r->drops, n);
411 MPASS(os.flags != IDLE);
412 if (os.flags == STALLED)
413 ifmp_ring_check_drainage(r, 0);
414 return (ENOBUFS);
415 }
416 ns.state = os.state;
417 ns.pidx_head = increment_idx(r, os.pidx_head, n);
418 critical_enter();
419 if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
420 break;
421 critical_exit();
422 cpu_spinwait();
423 }
424 pidx_start = os.pidx_head;
425 pidx_stop = ns.pidx_head;
426
427 /*
428 * Wait for other producers who got in ahead of us to enqueue their
429 * items, one producer at a time. It is our turn when the ring's
430 * pidx_tail reaches the beginning of our reservation (pidx_start).
431 */
432 while (ns.pidx_tail != pidx_start) {
433 cpu_spinwait();
434 ns.state = r->state;
435 }
436
437 /* Now it is our turn to fill up the area we reserved earlier. */
438 i = pidx_start;
439 do {
440 r->items[i] = *items++;
441 if (__predict_false(++i == r->size))
442 i = 0;
443 } while (i != pidx_stop);
444
445 /*
446 * Update the ring's pidx_tail. The release style atomic guarantees
447 * that the items are visible to any thread that sees the updated pidx.
448 */
449 os.state = r->state;
450 do {
451 ns.state = os.state;
452 ns.pidx_tail = pidx_stop;
453 if (abdicate) {
454 if (os.flags == IDLE)
455 ns.flags = ABDICATED;
456 } else
457 ns.flags = BUSY;
458 } while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
459 critical_exit();
460 counter_u64_add(r->enqueues, n);
461
462 if (!abdicate) {
463 /*
464 * Turn into a consumer if some other thread isn't active as a consumer
465 * already.
466 */
467 if (os.flags != BUSY)
468 drain_ring_lockless(r, ns, os.flags, budget);
469 }
470
471 return (0);
472 }
473 #endif
474
475 void
ifmp_ring_check_drainage(struct ifmp_ring * r,int budget)476 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
477 {
478 union ring_state os, ns;
479
480 os.state = r->state;
481 if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED
482 os.pidx_head != os.pidx_tail || // Require work to be available
483 (os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left
484 return;
485
486 MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
487 ns.state = os.state;
488 ns.flags = BUSY;
489
490 #ifdef MP_RING_NO_64BIT_ATOMICS
491 mtx_lock(&r->lock);
492 if (r->state != os.state) {
493 mtx_unlock(&r->lock);
494 return;
495 }
496 r->state = ns.state;
497 drain_ring_locked(r, ns, os.flags, budget);
498 mtx_unlock(&r->lock);
499 #else
500 /*
501 * The acquire style atomic guarantees visibility of items associated
502 * with the pidx that we read here.
503 */
504 if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
505 return;
506
507 drain_ring_lockless(r, ns, os.flags, budget);
508 #endif
509 }
510
511 void
ifmp_ring_reset_stats(struct ifmp_ring * r)512 ifmp_ring_reset_stats(struct ifmp_ring *r)
513 {
514
515 counter_u64_zero(r->enqueues);
516 counter_u64_zero(r->drops);
517 counter_u64_zero(r->starts);
518 counter_u64_zero(r->stalls);
519 counter_u64_zero(r->restarts);
520 counter_u64_zero(r->abdications);
521 }
522
523 int
ifmp_ring_is_idle(struct ifmp_ring * r)524 ifmp_ring_is_idle(struct ifmp_ring *r)
525 {
526 union ring_state s;
527
528 s.state = r->state;
529 if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
530 s.flags == IDLE)
531 return (1);
532
533 return (0);
534 }
535
536 int
ifmp_ring_is_stalled(struct ifmp_ring * r)537 ifmp_ring_is_stalled(struct ifmp_ring *r)
538 {
539 union ring_state s;
540
541 s.state = r->state;
542 if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
543 return (1);
544
545 return (0);
546 }
547