1 /*- 2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD 3 * 4 * Copyright (c) 2000 Doug Rabson 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 __FBSDID("$FreeBSD$"); 31 32 #include <sys/param.h> 33 #include <sys/systm.h> 34 #include <sys/bus.h> 35 #include <sys/cpuset.h> 36 #include <sys/interrupt.h> 37 #include <sys/kernel.h> 38 #include <sys/kthread.h> 39 #include <sys/libkern.h> 40 #include <sys/limits.h> 41 #include <sys/lock.h> 42 #include <sys/malloc.h> 43 #include <sys/mutex.h> 44 #include <sys/proc.h> 45 #include <sys/epoch.h> 46 #include <sys/sched.h> 47 #include <sys/smp.h> 48 #include <sys/taskqueue.h> 49 #include <sys/unistd.h> 50 #include <machine/stdarg.h> 51 52 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 53 static void *taskqueue_giant_ih; 54 static void *taskqueue_ih; 55 static void taskqueue_fast_enqueue(void *); 56 static void taskqueue_swi_enqueue(void *); 57 static void taskqueue_swi_giant_enqueue(void *); 58 59 struct taskqueue_busy { 60 struct task *tb_running; 61 u_int tb_seq; 62 LIST_ENTRY(taskqueue_busy) tb_link; 63 }; 64 65 struct taskqueue { 66 STAILQ_HEAD(, task) tq_queue; 67 LIST_HEAD(, taskqueue_busy) tq_active; 68 struct task *tq_hint; 69 u_int tq_seq; 70 int tq_callouts; 71 struct mtx_padalign tq_mutex; 72 taskqueue_enqueue_fn tq_enqueue; 73 void *tq_context; 74 char *tq_name; 75 struct thread **tq_threads; 76 int tq_tcount; 77 int tq_spin; 78 int tq_flags; 79 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 80 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 81 }; 82 83 #define TQ_FLAGS_ACTIVE (1 << 0) 84 #define TQ_FLAGS_BLOCKED (1 << 1) 85 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 86 87 #define DT_CALLOUT_ARMED (1 << 0) 88 #define DT_DRAIN_IN_PROGRESS (1 << 1) 89 90 #define TQ_LOCK(tq) \ 91 do { \ 92 if ((tq)->tq_spin) \ 93 mtx_lock_spin(&(tq)->tq_mutex); \ 94 else \ 95 mtx_lock(&(tq)->tq_mutex); \ 96 } while (0) 97 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 98 99 #define TQ_UNLOCK(tq) \ 100 do { \ 101 if ((tq)->tq_spin) \ 102 mtx_unlock_spin(&(tq)->tq_mutex); \ 103 else \ 104 mtx_unlock(&(tq)->tq_mutex); \ 105 } while (0) 106 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 107 108 void 109 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 110 int priority, task_fn_t func, void *context) 111 { 112 113 TASK_INIT(&timeout_task->t, priority, func, context); 114 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, 115 CALLOUT_RETURNUNLOCKED); 116 timeout_task->q = queue; 117 timeout_task->f = 0; 118 } 119 120 #ifndef FSTACK 121 static __inline int 122 TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm) 123 { 124 if (tq->tq_spin) 125 return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0)); 126 return (msleep(p, &tq->tq_mutex, 0, wm, 0)); 127 } 128 #else 129 #define TQ_SLEEP(a, b, c) break; 130 #endif 131 132 static struct taskqueue * 133 _taskqueue_create(const char *name, int mflags, 134 taskqueue_enqueue_fn enqueue, void *context, 135 int mtxflags, const char *mtxname __unused) 136 { 137 struct taskqueue *queue; 138 char *tq_name; 139 140 tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); 141 if (tq_name == NULL) 142 return (NULL); 143 144 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); 145 if (queue == NULL) { 146 free(tq_name, M_TASKQUEUE); 147 return (NULL); 148 } 149 150 snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 151 152 STAILQ_INIT(&queue->tq_queue); 153 LIST_INIT(&queue->tq_active); 154 queue->tq_enqueue = enqueue; 155 queue->tq_context = context; 156 queue->tq_name = tq_name; 157 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 158 queue->tq_flags |= TQ_FLAGS_ACTIVE; 159 if (enqueue == taskqueue_fast_enqueue || 160 enqueue == taskqueue_swi_enqueue || 161 enqueue == taskqueue_swi_giant_enqueue || 162 enqueue == taskqueue_thread_enqueue) 163 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 164 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 165 166 return (queue); 167 } 168 169 struct taskqueue * 170 taskqueue_create(const char *name, int mflags, 171 taskqueue_enqueue_fn enqueue, void *context) 172 { 173 174 return _taskqueue_create(name, mflags, enqueue, context, 175 MTX_DEF, name); 176 } 177 178 void 179 taskqueue_set_callback(struct taskqueue *queue, 180 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, 181 void *context) 182 { 183 184 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && 185 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), 186 ("Callback type %d not valid, must be %d-%d", cb_type, 187 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); 188 KASSERT((queue->tq_callbacks[cb_type] == NULL), 189 ("Re-initialization of taskqueue callback?")); 190 191 queue->tq_callbacks[cb_type] = callback; 192 queue->tq_cb_contexts[cb_type] = context; 193 } 194 195 /* 196 * Signal a taskqueue thread to terminate. 197 */ 198 static void 199 taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 200 { 201 202 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 203 wakeup(tq); 204 TQ_SLEEP(tq, pp, "tq_destroy"); 205 } 206 } 207 208 void 209 taskqueue_free(struct taskqueue *queue) 210 { 211 212 TQ_LOCK(queue); 213 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 214 taskqueue_terminate(queue->tq_threads, queue); 215 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); 216 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 217 mtx_destroy(&queue->tq_mutex); 218 free(queue->tq_threads, M_TASKQUEUE); 219 free(queue->tq_name, M_TASKQUEUE); 220 free(queue, M_TASKQUEUE); 221 } 222 223 static int 224 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 225 { 226 struct task *ins; 227 struct task *prev; 228 229 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); 230 /* 231 * Count multiple enqueues. 232 */ 233 if (task->ta_pending) { 234 if (task->ta_pending < USHRT_MAX) 235 task->ta_pending++; 236 TQ_UNLOCK(queue); 237 return (0); 238 } 239 240 /* 241 * Optimise cases when all tasks use small set of priorities. 242 * In case of only one priority we always insert at the end. 243 * In case of two tq_hint typically gives the insertion point. 244 * In case of more then two tq_hint should halve the search. 245 */ 246 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 247 if (!prev || prev->ta_priority >= task->ta_priority) { 248 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 249 } else { 250 prev = queue->tq_hint; 251 if (prev && prev->ta_priority >= task->ta_priority) { 252 ins = STAILQ_NEXT(prev, ta_link); 253 } else { 254 prev = NULL; 255 ins = STAILQ_FIRST(&queue->tq_queue); 256 } 257 for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 258 if (ins->ta_priority < task->ta_priority) 259 break; 260 261 if (prev) { 262 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 263 queue->tq_hint = task; 264 } else 265 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 266 } 267 268 task->ta_pending = 1; 269 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) 270 TQ_UNLOCK(queue); 271 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 272 queue->tq_enqueue(queue->tq_context); 273 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) 274 TQ_UNLOCK(queue); 275 276 /* Return with lock released. */ 277 return (0); 278 } 279 280 int 281 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 282 { 283 int res; 284 285 TQ_LOCK(queue); 286 res = taskqueue_enqueue_locked(queue, task); 287 /* The lock is released inside. */ 288 289 return (res); 290 } 291 292 static void 293 taskqueue_timeout_func(void *arg) 294 { 295 struct taskqueue *queue; 296 struct timeout_task *timeout_task; 297 298 timeout_task = arg; 299 queue = timeout_task->q; 300 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 301 timeout_task->f &= ~DT_CALLOUT_ARMED; 302 queue->tq_callouts--; 303 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 304 /* The lock is released inside. */ 305 } 306 307 int 308 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, 309 struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags) 310 { 311 int res; 312 313 TQ_LOCK(queue); 314 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 315 ("Migrated queue")); 316 KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); 317 timeout_task->q = queue; 318 res = timeout_task->t.ta_pending; 319 if (timeout_task->f & DT_DRAIN_IN_PROGRESS) { 320 /* Do nothing */ 321 TQ_UNLOCK(queue); 322 res = -1; 323 } else if (sbt == 0) { 324 taskqueue_enqueue_locked(queue, &timeout_task->t); 325 /* The lock is released inside. */ 326 } else { 327 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 328 res++; 329 } else { 330 queue->tq_callouts++; 331 timeout_task->f |= DT_CALLOUT_ARMED; 332 if (sbt < 0) 333 sbt = -sbt; /* Ignore overflow. */ 334 } 335 if (sbt > 0) { 336 callout_reset_sbt(&timeout_task->c, sbt, pr, 337 taskqueue_timeout_func, timeout_task, flags); 338 } 339 TQ_UNLOCK(queue); 340 } 341 return (res); 342 } 343 344 int 345 taskqueue_enqueue_timeout(struct taskqueue *queue, 346 struct timeout_task *ttask, int ticks) 347 { 348 349 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, 350 0, 0)); 351 } 352 353 static void 354 taskqueue_task_nop_fn(void *context, int pending) 355 { 356 } 357 358 /* 359 * Block until all currently queued tasks in this taskqueue 360 * have begun execution. Tasks queued during execution of 361 * this function are ignored. 362 */ 363 static int 364 taskqueue_drain_tq_queue(struct taskqueue *queue) 365 { 366 struct task t_barrier; 367 368 if (STAILQ_EMPTY(&queue->tq_queue)) 369 return (0); 370 371 /* 372 * Enqueue our barrier after all current tasks, but with 373 * the highest priority so that newly queued tasks cannot 374 * pass it. Because of the high priority, we can not use 375 * taskqueue_enqueue_locked directly (which drops the lock 376 * anyway) so just insert it at tail while we have the 377 * queue lock. 378 */ 379 TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier); 380 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 381 queue->tq_hint = &t_barrier; 382 t_barrier.ta_pending = 1; 383 384 /* 385 * Once the barrier has executed, all previously queued tasks 386 * have completed or are currently executing. 387 */ 388 while (t_barrier.ta_pending != 0) 389 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); 390 return (1); 391 } 392 393 /* 394 * Block until all currently executing tasks for this taskqueue 395 * complete. Tasks that begin execution during the execution 396 * of this function are ignored. 397 */ 398 static int 399 taskqueue_drain_tq_active(struct taskqueue *queue) 400 { 401 struct taskqueue_busy *tb; 402 u_int seq; 403 404 if (LIST_EMPTY(&queue->tq_active)) 405 return (0); 406 407 /* Block taskq_terminate().*/ 408 queue->tq_callouts++; 409 410 /* Wait for any active task with sequence from the past. */ 411 seq = queue->tq_seq; 412 restart: 413 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 414 if ((int)(tb->tb_seq - seq) <= 0) { 415 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); 416 goto restart; 417 } 418 } 419 420 /* Release taskqueue_terminate(). */ 421 queue->tq_callouts--; 422 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 423 wakeup_one(queue->tq_threads); 424 return (1); 425 } 426 427 void 428 taskqueue_block(struct taskqueue *queue) 429 { 430 431 TQ_LOCK(queue); 432 queue->tq_flags |= TQ_FLAGS_BLOCKED; 433 TQ_UNLOCK(queue); 434 } 435 436 void 437 taskqueue_unblock(struct taskqueue *queue) 438 { 439 440 TQ_LOCK(queue); 441 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 442 if (!STAILQ_EMPTY(&queue->tq_queue)) 443 queue->tq_enqueue(queue->tq_context); 444 TQ_UNLOCK(queue); 445 } 446 447 static void 448 taskqueue_run_locked(struct taskqueue *queue) 449 { 450 struct epoch_tracker et; 451 struct taskqueue_busy tb; 452 struct task *task; 453 bool in_net_epoch; 454 int pending; 455 456 KASSERT(queue != NULL, ("tq is NULL")); 457 TQ_ASSERT_LOCKED(queue); 458 tb.tb_running = NULL; 459 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); 460 in_net_epoch = false; 461 462 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { 463 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 464 if (queue->tq_hint == task) 465 queue->tq_hint = NULL; 466 pending = task->ta_pending; 467 task->ta_pending = 0; 468 tb.tb_running = task; 469 tb.tb_seq = ++queue->tq_seq; 470 TQ_UNLOCK(queue); 471 472 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); 473 if (!in_net_epoch && TASK_IS_NET(task)) { 474 in_net_epoch = true; 475 NET_EPOCH_ENTER(et); 476 } else if (in_net_epoch && !TASK_IS_NET(task)) { 477 NET_EPOCH_EXIT(et); 478 in_net_epoch = false; 479 } 480 task->ta_func(task->ta_context, pending); 481 482 TQ_LOCK(queue); 483 wakeup(task); 484 } 485 if (in_net_epoch) 486 NET_EPOCH_EXIT(et); 487 LIST_REMOVE(&tb, tb_link); 488 } 489 490 void 491 taskqueue_run(struct taskqueue *queue) 492 { 493 494 TQ_LOCK(queue); 495 taskqueue_run_locked(queue); 496 TQ_UNLOCK(queue); 497 } 498 499 static int 500 task_is_running(struct taskqueue *queue, struct task *task) 501 { 502 struct taskqueue_busy *tb; 503 504 TQ_ASSERT_LOCKED(queue); 505 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 506 if (tb->tb_running == task) 507 return (1); 508 } 509 return (0); 510 } 511 512 /* 513 * Only use this function in single threaded contexts. It returns 514 * non-zero if the given task is either pending or running. Else the 515 * task is idle and can be queued again or freed. 516 */ 517 int 518 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) 519 { 520 int retval; 521 522 TQ_LOCK(queue); 523 retval = task->ta_pending > 0 || task_is_running(queue, task); 524 TQ_UNLOCK(queue); 525 526 return (retval); 527 } 528 529 static int 530 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 531 u_int *pendp) 532 { 533 534 if (task->ta_pending > 0) { 535 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 536 if (queue->tq_hint == task) 537 queue->tq_hint = NULL; 538 } 539 if (pendp != NULL) 540 *pendp = task->ta_pending; 541 task->ta_pending = 0; 542 return (task_is_running(queue, task) ? EBUSY : 0); 543 } 544 545 int 546 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 547 { 548 int error; 549 550 TQ_LOCK(queue); 551 error = taskqueue_cancel_locked(queue, task, pendp); 552 TQ_UNLOCK(queue); 553 554 return (error); 555 } 556 557 int 558 taskqueue_cancel_timeout(struct taskqueue *queue, 559 struct timeout_task *timeout_task, u_int *pendp) 560 { 561 u_int pending, pending1; 562 int error; 563 564 TQ_LOCK(queue); 565 pending = !!(callout_stop(&timeout_task->c) > 0); 566 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 567 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 568 timeout_task->f &= ~DT_CALLOUT_ARMED; 569 queue->tq_callouts--; 570 } 571 TQ_UNLOCK(queue); 572 573 if (pendp != NULL) 574 *pendp = pending + pending1; 575 return (error); 576 } 577 578 void 579 taskqueue_drain(struct taskqueue *queue, struct task *task) 580 { 581 582 if (!queue->tq_spin) 583 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 584 585 TQ_LOCK(queue); 586 while (task->ta_pending != 0 || task_is_running(queue, task)) 587 TQ_SLEEP(queue, task, "tq_drain"); 588 TQ_UNLOCK(queue); 589 } 590 591 void 592 taskqueue_drain_all(struct taskqueue *queue) 593 { 594 595 if (!queue->tq_spin) 596 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 597 598 TQ_LOCK(queue); 599 (void)taskqueue_drain_tq_queue(queue); 600 (void)taskqueue_drain_tq_active(queue); 601 TQ_UNLOCK(queue); 602 } 603 604 void 605 taskqueue_drain_timeout(struct taskqueue *queue, 606 struct timeout_task *timeout_task) 607 { 608 609 /* 610 * Set flag to prevent timer from re-starting during drain: 611 */ 612 TQ_LOCK(queue); 613 KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, 614 ("Drain already in progress")); 615 timeout_task->f |= DT_DRAIN_IN_PROGRESS; 616 TQ_UNLOCK(queue); 617 618 callout_drain(&timeout_task->c); 619 taskqueue_drain(queue, &timeout_task->t); 620 621 /* 622 * Clear flag to allow timer to re-start: 623 */ 624 TQ_LOCK(queue); 625 timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; 626 TQ_UNLOCK(queue); 627 } 628 629 void 630 taskqueue_quiesce(struct taskqueue *queue) 631 { 632 int ret; 633 634 TQ_LOCK(queue); 635 do { 636 ret = taskqueue_drain_tq_queue(queue); 637 if (ret == 0) 638 ret = taskqueue_drain_tq_active(queue); 639 } while (ret != 0); 640 TQ_UNLOCK(queue); 641 } 642 643 static void 644 taskqueue_swi_enqueue(void *context) 645 { 646 swi_sched(taskqueue_ih, 0); 647 } 648 649 static void 650 taskqueue_swi_run(void *dummy) 651 { 652 taskqueue_run(taskqueue_swi); 653 } 654 655 static void 656 taskqueue_swi_giant_enqueue(void *context) 657 { 658 swi_sched(taskqueue_giant_ih, 0); 659 } 660 661 static void 662 taskqueue_swi_giant_run(void *dummy) 663 { 664 taskqueue_run(taskqueue_swi_giant); 665 } 666 667 #ifndef FSTACK 668 static int 669 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 670 cpuset_t *mask, struct proc *p, const char *name, va_list ap) 671 { 672 char ktname[MAXCOMLEN + 1]; 673 struct thread *td; 674 struct taskqueue *tq; 675 int i, error; 676 677 if (count <= 0) 678 return (EINVAL); 679 680 vsnprintf(ktname, sizeof(ktname), name, ap); 681 tq = *tqp; 682 683 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, 684 M_NOWAIT | M_ZERO); 685 if (tq->tq_threads == NULL) { 686 printf("%s: no memory for %s threads\n", __func__, ktname); 687 return (ENOMEM); 688 } 689 690 for (i = 0; i < count; i++) { 691 if (count == 1) 692 error = kthread_add(taskqueue_thread_loop, tqp, p, 693 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 694 else 695 error = kthread_add(taskqueue_thread_loop, tqp, p, 696 &tq->tq_threads[i], RFSTOPPED, 0, 697 "%s_%d", ktname, i); 698 if (error) { 699 /* should be ok to continue, taskqueue_free will dtrt */ 700 printf("%s: kthread_add(%s): error %d", __func__, 701 ktname, error); 702 tq->tq_threads[i] = NULL; /* paranoid */ 703 } else 704 tq->tq_tcount++; 705 } 706 if (tq->tq_tcount == 0) { 707 free(tq->tq_threads, M_TASKQUEUE); 708 tq->tq_threads = NULL; 709 return (ENOMEM); 710 } 711 for (i = 0; i < count; i++) { 712 if (tq->tq_threads[i] == NULL) 713 continue; 714 td = tq->tq_threads[i]; 715 if (mask) { 716 error = cpuset_setthread(td->td_tid, mask); 717 /* 718 * Failing to pin is rarely an actual fatal error; 719 * it'll just affect performance. 720 */ 721 if (error) 722 printf("%s: curthread=%llu: can't pin; " 723 "error=%d\n", 724 __func__, 725 (unsigned long long) td->td_tid, 726 error); 727 } 728 thread_lock(td); 729 sched_prio(td, pri); 730 sched_add(td, SRQ_BORING); 731 } 732 733 return (0); 734 } 735 #endif 736 737 int 738 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, 739 const char *name, ...) 740 { 741 #ifndef FSTACK 742 va_list ap; 743 int error; 744 745 va_start(ap, name); 746 error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap); 747 va_end(ap); 748 return (error); 749 #else 750 return (0); 751 #endif 752 } 753 754 int 755 taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri, 756 struct proc *proc, const char *name, ...) 757 { 758 #ifndef FSTACK 759 va_list ap; 760 int error; 761 762 va_start(ap, name); 763 error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap); 764 va_end(ap); 765 return (error); 766 #else 767 return (0); 768 #endif 769 } 770 771 int 772 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, 773 cpuset_t *mask, const char *name, ...) 774 { 775 #ifndef FSTACK 776 va_list ap; 777 int error; 778 779 va_start(ap, name); 780 error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap); 781 va_end(ap); 782 return (error); 783 #else 784 return (0); 785 #endif 786 } 787 788 static inline void 789 taskqueue_run_callback(struct taskqueue *tq, 790 enum taskqueue_callback_type cb_type) 791 { 792 taskqueue_callback_fn tq_callback; 793 794 TQ_ASSERT_UNLOCKED(tq); 795 tq_callback = tq->tq_callbacks[cb_type]; 796 if (tq_callback != NULL) 797 tq_callback(tq->tq_cb_contexts[cb_type]); 798 } 799 800 void 801 taskqueue_thread_loop(void *arg) 802 { 803 struct taskqueue **tqp, *tq; 804 805 tqp = arg; 806 tq = *tqp; 807 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 808 TQ_LOCK(tq); 809 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 810 /* XXX ? */ 811 taskqueue_run_locked(tq); 812 /* 813 * Because taskqueue_run() can drop tq_mutex, we need to 814 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 815 * meantime, which means we missed a wakeup. 816 */ 817 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 818 break; 819 TQ_SLEEP(tq, tq, "-"); 820 } 821 taskqueue_run_locked(tq); 822 /* 823 * This thread is on its way out, so just drop the lock temporarily 824 * in order to call the shutdown callback. This allows the callback 825 * to look at the taskqueue, even just before it dies. 826 */ 827 TQ_UNLOCK(tq); 828 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 829 TQ_LOCK(tq); 830 831 /* rendezvous with thread that asked us to terminate */ 832 tq->tq_tcount--; 833 wakeup_one(tq->tq_threads); 834 TQ_UNLOCK(tq); 835 kthread_exit(); 836 } 837 838 void 839 taskqueue_thread_enqueue(void *context) 840 { 841 struct taskqueue **tqp, *tq; 842 843 tqp = context; 844 tq = *tqp; 845 wakeup_any(tq); 846 } 847 848 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, 849 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, 850 INTR_MPSAFE, &taskqueue_ih)); 851 852 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, 853 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, 854 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); 855 856 TASKQUEUE_DEFINE_THREAD(thread); 857 858 struct taskqueue * 859 taskqueue_create_fast(const char *name, int mflags, 860 taskqueue_enqueue_fn enqueue, void *context) 861 { 862 return _taskqueue_create(name, mflags, enqueue, context, 863 MTX_SPIN, "fast_taskqueue"); 864 } 865 866 static void *taskqueue_fast_ih; 867 868 static void 869 taskqueue_fast_enqueue(void *context) 870 { 871 swi_sched(taskqueue_fast_ih, 0); 872 } 873 874 static void 875 taskqueue_fast_run(void *dummy) 876 { 877 taskqueue_run(taskqueue_fast); 878 } 879 880 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, 881 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, 882 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); 883 884 int 885 taskqueue_member(struct taskqueue *queue, struct thread *td) 886 { 887 int i, j, ret = 0; 888 889 for (i = 0, j = 0; ; i++) { 890 if (queue->tq_threads[i] == NULL) 891 continue; 892 if (queue->tq_threads[i] == td) { 893 ret = 1; 894 break; 895 } 896 if (++j >= queue->tq_tcount) 897 break; 898 } 899 return (ret); 900 } 901