1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause 3 * 4 * Copyright (c) 2000 Doug Rabson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/bus.h> 33 #include <sys/cpuset.h> 34 #include <sys/interrupt.h> 35 #include <sys/kernel.h> 36 #include <sys/kthread.h> 37 #include <sys/libkern.h> 38 #include <sys/limits.h> 39 #include <sys/lock.h> 40 #include <sys/malloc.h> 41 #include <sys/mutex.h> 42 #include <sys/proc.h> 43 #include <sys/epoch.h> 44 #include <sys/sched.h> 45 #include <sys/smp.h> 46 #include <sys/taskqueue.h> 47 #include <sys/unistd.h> 48 #include <machine/stdarg.h> 49 50 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 51 static void *taskqueue_giant_ih; 52 static void *taskqueue_ih; 53 static void taskqueue_fast_enqueue(void *); 54 static void taskqueue_swi_enqueue(void *); 55 static void taskqueue_swi_giant_enqueue(void *); 56 57 struct taskqueue_busy { 58 struct task *tb_running; 59 u_int tb_seq; 60 bool tb_canceling; 61 LIST_ENTRY(taskqueue_busy) tb_link; 62 }; 63 64 struct taskqueue { 65 STAILQ_HEAD(, task) tq_queue; 66 LIST_HEAD(, taskqueue_busy) tq_active; 67 struct task *tq_hint; 68 u_int tq_seq; 69 int tq_callouts; 70 struct mtx_padalign tq_mutex; 71 taskqueue_enqueue_fn tq_enqueue; 72 void *tq_context; 73 char *tq_name; 74 struct thread **tq_threads; 75 int tq_tcount; 76 int tq_spin; 77 int tq_flags; 78 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 79 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 80 }; 81 82 #define TQ_FLAGS_ACTIVE (1 << 0) 83 #define TQ_FLAGS_BLOCKED (1 << 1) 84 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 85 86 #define DT_CALLOUT_ARMED (1 << 0) 87 #define DT_DRAIN_IN_PROGRESS (1 << 1) 88 89 #define TQ_LOCK(tq) \ 90 do { \ 91 if ((tq)->tq_spin) \ 92 mtx_lock_spin(&(tq)->tq_mutex); \ 93 else \ 94 mtx_lock(&(tq)->tq_mutex); \ 95 } while (0) 96 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 97 98 #define TQ_UNLOCK(tq) \ 99 do { \ 100 if ((tq)->tq_spin) \ 101 mtx_unlock_spin(&(tq)->tq_mutex); \ 102 else \ 103 mtx_unlock(&(tq)->tq_mutex); \ 104 } while (0) 105 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 106 107 void 108 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 109 int priority, task_fn_t func, void *context) 110 { 111 112 TASK_INIT(&timeout_task->t, priority, func, context); 113 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 114 CALLOUT_RETURNUNLOCKED); 115 timeout_task->q = queue; 116 timeout_task->f = 0; 117 } 118 119 static __inline int 120 TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm) 121 { 122 if (tq->tq_spin) 123 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0)); 124 return (msleep(p, &tq->tq_mutex, 0, wm, 0)); 125 } 126 127 static struct taskqueue_busy * 128 task_get_busy(struct taskqueue *queue, struct task *task) 129 { 130 struct taskqueue_busy *tb; 131 132 TQ_ASSERT_LOCKED(queue); 133 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 134 if (tb->tb_running == task) 135 return (tb); 136 } 137 return (NULL); 138 } 139 140 static struct taskqueue * 141 _taskqueue_create(const char *name, int mflags, 142 taskqueue_enqueue_fn enqueue, void *context, 143 int mtxflags, const char *mtxname __unused) 144 { 145 struct taskqueue *queue; 146 char *tq_name; 147 148 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); 149 if (tq_name == NULL) 150 return (NULL); 151 152 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 153 if (queue == NULL) { 154 free(tq_name, M_TASKQUEUE); 155 return (NULL); 156 } 157 158 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 159 160 STAILQ_INIT(&queue->tq_queue); 161 LIST_INIT(&queue->tq_active); 162 queue->tq_enqueue = enqueue; 163 queue->tq_context = context; 164 queue->tq_name = tq_name; 165 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 166 queue->tq_flags |= TQ_FLAGS_ACTIVE; 167 if (enqueue == taskqueue_fast_enqueue || 168 enqueue == taskqueue_swi_enqueue || 169 enqueue == taskqueue_swi_giant_enqueue || 170 enqueue == taskqueue_thread_enqueue) 171 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 172 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 173 174 return (queue); 175 } 176 177 struct taskqueue * 178 taskqueue_create(const char *name, int mflags, 179 taskqueue_enqueue_fn enqueue, void *context) 180 { 181 182 return _taskqueue_create(name, mflags, enqueue, context, 183 MTX_DEF, name); 184 } 185 186 void 187 taskqueue_set_callback(struct taskqueue *queue, 188 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 189 void *context) 190 { 191 192 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 193 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 194 ("Callback type %d not valid, must be %d-%d", cb_type, 195 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 196 KASSERT((queue->tq_callbacks[cb_type] == NULL), 197 ("Re-initialization of taskqueue callback?")); 198 199 queue->tq_callbacks[cb_type] = callback; 200 queue->tq_cb_contexts[cb_type] = context; 201 } 202 203 /* 204 * Signal a taskqueue thread to terminate. 205 */ 206 static void 207 taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 208 { 209 210 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 211 wakeup(tq); 212 TQ_SLEEP(tq, pp, "tq_destroy"); 213 } 214 } 215 216 void 217 taskqueue_free(struct taskqueue *queue) 218 { 219 220 TQ_LOCK(queue); 221 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 222 taskqueue_terminate(queue->tq_threads, queue); 223 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); 224 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 225 mtx_destroy(&queue->tq_mutex); 226 free(queue->tq_threads, M_TASKQUEUE); 227 free(queue->tq_name, M_TASKQUEUE); 228 free(queue, M_TASKQUEUE); 229 } 230 231 static int 232 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags) 233 { 234 struct task *ins; 235 struct task *prev; 236 struct taskqueue_busy *tb; 237 238 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); 239 /* 240 * Ignore canceling task if requested. 241 */ 242 if (__predict_false((flags & TASKQUEUE_FAIL_IF_CANCELING) != 0)) { 243 tb = task_get_busy(queue, task); 244 if (tb != NULL && tb->tb_canceling) { 245 TQ_UNLOCK(queue); 246 return (ECANCELED); 247 } 248 } 249 250 /* 251 * Count multiple enqueues. 252 */ 253 if (task->ta_pending) { 254 if (__predict_false((flags & TASKQUEUE_FAIL_IF_PENDING) != 0)) { 255 TQ_UNLOCK(queue); 256 return (EEXIST); 257 } 258 if (task->ta_pending < USHRT_MAX) 259 task->ta_pending++; 260 TQ_UNLOCK(queue); 261 return (0); 262 } 263 264 /* 265 * Optimise cases when all tasks use small set of priorities. 266 * In case of only one priority we always insert at the end. 267 * In case of two tq_hint typically gives the insertion point. 268 * In case of more then two tq_hint should halve the search. 269 */ 270 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 271 if (!prev || prev->ta_priority >= task->ta_priority) { 272 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 273 } else { 274 prev = queue->tq_hint; 275 if (prev && prev->ta_priority >= task->ta_priority) { 276 ins = STAILQ_NEXT(prev, ta_link); 277 } else { 278 prev = NULL; 279 ins = STAILQ_FIRST(&queue->tq_queue); 280 } 281 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 282 if (ins->ta_priority < task->ta_priority) 283 break; 284 285 if (prev) { 286 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 287 queue->tq_hint = task; 288 } else 289 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 290 } 291 292 task->ta_pending = 1; 293 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 294 TQ_UNLOCK(queue); 295 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 296 queue->tq_enqueue(queue->tq_context); 297 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 298 TQ_UNLOCK(queue); 299 300 /* Return with lock released. */ 301 return (0); 302 } 303 304 int 305 taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags) 306 { 307 int res; 308 309 TQ_LOCK(queue); 310 res = taskqueue_enqueue_locked(queue, task, flags); 311 /* The lock is released inside. */ 312 313 return (res); 314 } 315 316 int 317 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 318 { 319 return (taskqueue_enqueue_flags(queue, task, 0)); 320 } 321 322 static void 323 taskqueue_timeout_func(void *arg) 324 { 325 struct taskqueue *queue; 326 struct timeout_task *timeout_task; 327 328 timeout_task = arg; 329 queue = timeout_task->q; 330 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 331 timeout_task->f &= ~DT_CALLOUT_ARMED; 332 queue->tq_callouts--; 333 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, 0); 334 /* The lock is released inside. */ 335 } 336 337 int 338 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, 339 struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags) 340 { 341 int res; 342 343 TQ_LOCK(queue); 344 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 345 ("Migrated queue")); 346 timeout_task->q = queue; 347 res = timeout_task->t.ta_pending; 348 if (timeout_task->f & DT_DRAIN_IN_PROGRESS) { 349 /* Do nothing */ 350 TQ_UNLOCK(queue); 351 res = -1; 352 } else if (sbt == 0) { 353 taskqueue_enqueue_locked(queue, &timeout_task->t, 0); 354 /* The lock is released inside. */ 355 } else { 356 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 357 res++; 358 } else { 359 queue->tq_callouts++; 360 timeout_task->f |= DT_CALLOUT_ARMED; 361 if (sbt < 0) 362 sbt = -sbt; /* Ignore overflow. */ 363 } 364 if (sbt > 0) { 365 if (queue->tq_spin) 366 flags |= C_DIRECT_EXEC; 367 if (queue->tq_spin && queue->tq_tcount == 1 && 368 queue->tq_threads[0] == curthread) { 369 callout_reset_sbt_curcpu(&timeout_task->c, sbt, pr, 370 taskqueue_timeout_func, timeout_task, flags); 371 } else { 372 callout_reset_sbt(&timeout_task->c, sbt, pr, 373 taskqueue_timeout_func, timeout_task, flags); 374 } 375 } 376 TQ_UNLOCK(queue); 377 } 378 return (res); 379 } 380 381 int 382 taskqueue_enqueue_timeout(struct taskqueue *queue, 383 struct timeout_task *ttask, int ticks) 384 { 385 386 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, 387 0, C_HARDCLOCK)); 388 } 389 390 static void 391 taskqueue_task_nop_fn(void *context, int pending) 392 { 393 } 394 395 /* 396 * Block until all currently queued tasks in this taskqueue 397 * have begun execution. Tasks queued during execution of 398 * this function are ignored. 399 */ 400 static int 401 taskqueue_drain_tq_queue(struct taskqueue *queue) 402 { 403 struct task t_barrier; 404 405 if (STAILQ_EMPTY(&queue->tq_queue)) 406 return (0); 407 408 /* 409 * Enqueue our barrier after all current tasks, but with 410 * the highest priority so that newly queued tasks cannot 411 * pass it. Because of the high priority, we can not use 412 * taskqueue_enqueue_locked directly (which drops the lock 413 * anyway) so just insert it at tail while we have the 414 * queue lock. 415 */ 416 TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier); 417 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 418 queue->tq_hint = &t_barrier; 419 t_barrier.ta_pending = 1; 420 421 /* 422 * Once the barrier has executed, all previously queued tasks 423 * have completed or are currently executing. 424 */ 425 while (t_barrier.ta_pending != 0) 426 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); 427 return (1); 428 } 429 430 /* 431 * Block until all currently executing tasks for this taskqueue 432 * complete. Tasks that begin execution during the execution 433 * of this function are ignored. 434 */ 435 static int 436 taskqueue_drain_tq_active(struct taskqueue *queue) 437 { 438 struct taskqueue_busy *tb; 439 u_int seq; 440 441 if (LIST_EMPTY(&queue->tq_active)) 442 return (0); 443 444 /* Block taskq_terminate().*/ 445 queue->tq_callouts++; 446 447 /* Wait for any active task with sequence from the past. */ 448 seq = queue->tq_seq; 449 restart: 450 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 451 if ((int)(tb->tb_seq - seq) <= 0) { 452 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); 453 goto restart; 454 } 455 } 456 457 /* Release taskqueue_terminate(). */ 458 queue->tq_callouts--; 459 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 460 wakeup_one(queue->tq_threads); 461 return (1); 462 } 463 464 void 465 taskqueue_block(struct taskqueue *queue) 466 { 467 468 TQ_LOCK(queue); 469 queue->tq_flags |= TQ_FLAGS_BLOCKED; 470 TQ_UNLOCK(queue); 471 } 472 473 void 474 taskqueue_unblock(struct taskqueue *queue) 475 { 476 477 TQ_LOCK(queue); 478 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 479 if (!STAILQ_EMPTY(&queue->tq_queue)) 480 queue->tq_enqueue(queue->tq_context); 481 TQ_UNLOCK(queue); 482 } 483 484 static void 485 taskqueue_run_locked(struct taskqueue *queue) 486 { 487 struct epoch_tracker et; 488 struct taskqueue_busy tb; 489 struct task *task; 490 bool in_net_epoch; 491 int pending; 492 493 KASSERT(queue != NULL, ("tq is NULL")); 494 TQ_ASSERT_LOCKED(queue); 495 tb.tb_running = NULL; 496 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); 497 in_net_epoch = false; 498 499 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { 500 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 501 if (queue->tq_hint == task) 502 queue->tq_hint = NULL; 503 pending = task->ta_pending; 504 task->ta_pending = 0; 505 tb.tb_running = task; 506 tb.tb_seq = ++queue->tq_seq; 507 tb.tb_canceling = false; 508 TQ_UNLOCK(queue); 509 510 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); 511 if (!in_net_epoch && TASK_IS_NET(task)) { 512 in_net_epoch = true; 513 NET_EPOCH_ENTER(et); 514 } else if (in_net_epoch && !TASK_IS_NET(task)) { 515 NET_EPOCH_EXIT(et); 516 in_net_epoch = false; 517 } 518 task->ta_func(task->ta_context, pending); 519 520 TQ_LOCK(queue); 521 wakeup(task); 522 } 523 if (in_net_epoch) 524 NET_EPOCH_EXIT(et); 525 LIST_REMOVE(&tb, tb_link); 526 } 527 528 void 529 taskqueue_run(struct taskqueue *queue) 530 { 531 532 TQ_LOCK(queue); 533 taskqueue_run_locked(queue); 534 TQ_UNLOCK(queue); 535 } 536 537 /* 538 * Only use this function in single threaded contexts. It returns 539 * non-zero if the given task is either pending or running. Else the 540 * task is idle and can be queued again or freed. 541 */ 542 int 543 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) 544 { 545 int retval; 546 547 TQ_LOCK(queue); 548 retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL; 549 TQ_UNLOCK(queue); 550 551 return (retval); 552 } 553 554 static int 555 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 556 u_int *pendp) 557 { 558 struct taskqueue_busy *tb; 559 int retval = 0; 560 561 if (task->ta_pending > 0) { 562 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 563 if (queue->tq_hint == task) 564 queue->tq_hint = NULL; 565 } 566 if (pendp != NULL) 567 *pendp = task->ta_pending; 568 task->ta_pending = 0; 569 tb = task_get_busy(queue, task); 570 if (tb != NULL) { 571 tb->tb_canceling = true; 572 retval = EBUSY; 573 } 574 575 return (retval); 576 } 577 578 int 579 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 580 { 581 int error; 582 583 TQ_LOCK(queue); 584 error = taskqueue_cancel_locked(queue, task, pendp); 585 TQ_UNLOCK(queue); 586 587 return (error); 588 } 589 590 int 591 taskqueue_cancel_timeout(struct taskqueue *queue, 592 struct timeout_task *timeout_task, u_int *pendp) 593 { 594 u_int pending, pending1; 595 int error; 596 597 TQ_LOCK(queue); 598 pending = !!(callout_stop(&timeout_task->c) > 0); 599 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 600 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 601 timeout_task->f &= ~DT_CALLOUT_ARMED; 602 queue->tq_callouts--; 603 } 604 TQ_UNLOCK(queue); 605 606 if (pendp != NULL) 607 *pendp = pending + pending1; 608 return (error); 609 } 610 611 void 612 taskqueue_drain(struct taskqueue *queue, struct task *task) 613 { 614 615 if (!queue->tq_spin) 616 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 617 618 TQ_LOCK(queue); 619 while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL) 620 TQ_SLEEP(queue, task, "tq_drain"); 621 TQ_UNLOCK(queue); 622 } 623 624 void 625 taskqueue_drain_all(struct taskqueue *queue) 626 { 627 628 if (!queue->tq_spin) 629 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 630 631 TQ_LOCK(queue); 632 (void)taskqueue_drain_tq_queue(queue); 633 (void)taskqueue_drain_tq_active(queue); 634 TQ_UNLOCK(queue); 635 } 636 637 void 638 taskqueue_drain_timeout(struct taskqueue *queue, 639 struct timeout_task *timeout_task) 640 { 641 642 /* 643 * Set flag to prevent timer from re-starting during drain: 644 */ 645 TQ_LOCK(queue); 646 KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 647 ("Drain already in progress")); 648 timeout_task->f |= DT_DRAIN_IN_PROGRESS; 649 TQ_UNLOCK(queue); 650 651 callout_drain(&timeout_task->c); 652 taskqueue_drain(queue, &timeout_task->t); 653 654 /* 655 * Clear flag to allow timer to re-start: 656 */ 657 TQ_LOCK(queue); 658 timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 659 TQ_UNLOCK(queue); 660 } 661 662 void 663 taskqueue_quiesce(struct taskqueue *queue) 664 { 665 int ret; 666 667 TQ_LOCK(queue); 668 do { 669 ret = taskqueue_drain_tq_queue(queue); 670 if (ret == 0) 671 ret = taskqueue_drain_tq_active(queue); 672 } while (ret != 0); 673 TQ_UNLOCK(queue); 674 } 675 676 static void 677 taskqueue_swi_enqueue(void *context) 678 { 679 swi_sched(taskqueue_ih, 0); 680 } 681 682 static void 683 taskqueue_swi_run(void *dummy) 684 { 685 taskqueue_run(taskqueue_swi); 686 } 687 688 static void 689 taskqueue_swi_giant_enqueue(void *context) 690 { 691 swi_sched(taskqueue_giant_ih, 0); 692 } 693 694 static void 695 taskqueue_swi_giant_run(void *dummy) 696 { 697 taskqueue_run(taskqueue_swi_giant); 698 } 699 700 static int 701 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 702 cpuset_t *mask, struct proc *p, const char *name, va_list ap) 703 { 704 char ktname[MAXCOMLEN + 1]; 705 struct thread *td; 706 struct taskqueue *tq; 707 int i, error; 708 709 if (count <= 0) 710 return (EINVAL); 711 712 vsnprintf(ktname, sizeof(ktname), name, ap); 713 tq = *tqp; 714 715 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 716 M_NOWAIT | M_ZERO); 717 if (tq->tq_threads == NULL) { 718 printf("%s: no memory for %s threads\n", __func__, ktname); 719 return (ENOMEM); 720 } 721 722 for (i = 0; i < count; i++) { 723 if (count == 1) 724 error = kthread_add(taskqueue_thread_loop, tqp, p, 725 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 726 else 727 error = kthread_add(taskqueue_thread_loop, tqp, p, 728 &tq->tq_threads[i], RFSTOPPED, 0, 729 "%s_%d", ktname, i); 730 if (error) { 731 /* should be ok to continue, taskqueue_free will dtrt */ 732 printf("%s: kthread_add(%s): error %d", __func__, 733 ktname, error); 734 tq->tq_threads[i] = NULL; /* paranoid */ 735 } else 736 tq->tq_tcount++; 737 } 738 if (tq->tq_tcount == 0) { 739 free(tq->tq_threads, M_TASKQUEUE); 740 tq->tq_threads = NULL; 741 return (ENOMEM); 742 } 743 for (i = 0; i < count; i++) { 744 if (tq->tq_threads[i] == NULL) 745 continue; 746 td = tq->tq_threads[i]; 747 if (mask) { 748 error = cpuset_setthread(td->td_tid, mask); 749 /* 750 * Failing to pin is rarely an actual fatal error; 751 * it'll just affect performance. 752 */ 753 if (error) 754 printf("%s: curthread=%llu: can't pin; " 755 "error=%d\n", 756 __func__, 757 (unsigned long long) td->td_tid, 758 error); 759 } 760 thread_lock(td); 761 sched_prio(td, pri); 762 sched_add(td, SRQ_BORING); 763 } 764 765 return (0); 766 } 767 768 int 769 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 770 const char *name, ...) 771 { 772 va_list ap; 773 int error; 774 775 va_start(ap, name); 776 error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap); 777 va_end(ap); 778 return (error); 779 } 780 781 int 782 taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri, 783 struct proc *proc, const char *name, ...) 784 { 785 va_list ap; 786 int error; 787 788 va_start(ap, name); 789 error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap); 790 va_end(ap); 791 return (error); 792 } 793 794 int 795 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, 796 cpuset_t *mask, const char *name, ...) 797 { 798 va_list ap; 799 int error; 800 801 va_start(ap, name); 802 error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap); 803 va_end(ap); 804 return (error); 805 } 806 807 static inline void 808 taskqueue_run_callback(struct taskqueue *tq, 809 enum taskqueue_callback_type cb_type) 810 { 811 taskqueue_callback_fn tq_callback; 812 813 TQ_ASSERT_UNLOCKED(tq); 814 tq_callback = tq->tq_callbacks[cb_type]; 815 if (tq_callback != NULL) 816 tq_callback(tq->tq_cb_contexts[cb_type]); 817 } 818 819 void 820 taskqueue_thread_loop(void *arg) 821 { 822 struct taskqueue **tqp, *tq; 823 824 tqp = arg; 825 tq = *tqp; 826 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 827 TQ_LOCK(tq); 828 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 829 /* XXX ? */ 830 taskqueue_run_locked(tq); 831 /* 832 * Because taskqueue_run() can drop tq_mutex, we need to 833 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 834 * meantime, which means we missed a wakeup. 835 */ 836 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 837 break; 838 TQ_SLEEP(tq, tq, "-"); 839 } 840 taskqueue_run_locked(tq); 841 /* 842 * This thread is on its way out, so just drop the lock temporarily 843 * in order to call the shutdown callback. This allows the callback 844 * to look at the taskqueue, even just before it dies. 845 */ 846 TQ_UNLOCK(tq); 847 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 848 TQ_LOCK(tq); 849 850 /* rendezvous with thread that asked us to terminate */ 851 tq->tq_tcount--; 852 wakeup_one(tq->tq_threads); 853 TQ_UNLOCK(tq); 854 kthread_exit(); 855 } 856 857 void 858 taskqueue_thread_enqueue(void *context) 859 { 860 struct taskqueue **tqp, *tq; 861 862 tqp = context; 863 tq = *tqp; 864 wakeup_any(tq); 865 } 866 867 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 868 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 869 INTR_MPSAFE, &taskqueue_ih)); 870 871 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 872 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 873 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 874 875 TASKQUEUE_DEFINE_THREAD(thread); 876 877 struct taskqueue * 878 taskqueue_create_fast(const char *name, int mflags, 879 taskqueue_enqueue_fn enqueue, void *context) 880 { 881 return _taskqueue_create(name, mflags, enqueue, context, 882 MTX_SPIN, "fast_taskqueue"); 883 } 884 885 static void *taskqueue_fast_ih; 886 887 static void 888 taskqueue_fast_enqueue(void *context) 889 { 890 swi_sched(taskqueue_fast_ih, 0); 891 } 892 893 static void 894 taskqueue_fast_run(void *dummy) 895 { 896 taskqueue_run(taskqueue_fast); 897 } 898 899 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 900 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 901 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 902 903 int 904 taskqueue_member(struct taskqueue *queue, struct thread *td) 905 { 906 int i, j, ret = 0; 907 908 for (i = 0, j = 0; ; i++) { 909 if (queue->tq_threads[i] == NULL) 910 continue; 911 if (queue->tq_threads[i] == td) { 912 ret = 1; 913 break; 914 } 915 if (++j >= queue->tq_tcount) 916 break; 917 } 918 return (ret); 919 } 920