1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <[email protected]> 12 * Andrew Morton 13 * Kai Petzke <[email protected]> 14 * Theodore Ts'o <[email protected]> 15 * 16 * Made to use alloc_percpu by Christoph Lameter. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 #include <linux/mempolicy.h> 32 #include <linux/freezer.h> 33 #include <linux/kallsyms.h> 34 #include <linux/debug_locks.h> 35 #include <linux/lockdep.h> 36 #include <trace/workqueue.h> 37 38 /* 39 * The per-CPU workqueue (if single thread, we always use the first 40 * possible cpu). 41 */ 42 struct cpu_workqueue_struct { 43 44 spinlock_t lock; 45 46 struct list_head worklist; 47 wait_queue_head_t more_work; 48 struct work_struct *current_work; 49 50 struct workqueue_struct *wq; 51 struct task_struct *thread; 52 53 int run_depth; /* Detect run_workqueue() recursion depth */ 54 } ____cacheline_aligned; 55 56 /* 57 * The externally visible workqueue abstraction is an array of 58 * per-CPU workqueues: 59 */ 60 struct workqueue_struct { 61 struct cpu_workqueue_struct *cpu_wq; 62 struct list_head list; 63 const char *name; 64 int singlethread; 65 int freezeable; /* Freeze threads during suspend */ 66 int rt; 67 #ifdef CONFIG_LOCKDEP 68 struct lockdep_map lockdep_map; 69 #endif 70 }; 71 72 /* Serializes the accesses to the list of workqueues. */ 73 static DEFINE_SPINLOCK(workqueue_lock); 74 static LIST_HEAD(workqueues); 75 76 static int singlethread_cpu __read_mostly; 77 static const struct cpumask *cpu_singlethread_map __read_mostly; 78 /* 79 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD 80 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work 81 * which comes in between can't use for_each_online_cpu(). We could 82 * use cpu_possible_map, the cpumask below is more a documentation 83 * than optimization. 84 */ 85 static cpumask_var_t cpu_populated_map __read_mostly; 86 87 /* If it's single threaded, it isn't in the list of workqueues. */ 88 static inline int is_wq_single_threaded(struct workqueue_struct *wq) 89 { 90 return wq->singlethread; 91 } 92 93 static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) 94 { 95 return is_wq_single_threaded(wq) 96 ? cpu_singlethread_map : cpu_populated_map; 97 } 98 99 static 100 struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu) 101 { 102 if (unlikely(is_wq_single_threaded(wq))) 103 cpu = singlethread_cpu; 104 return per_cpu_ptr(wq->cpu_wq, cpu); 105 } 106 107 /* 108 * Set the workqueue on which a work item is to be run 109 * - Must *only* be called if the pending flag is set 110 */ 111 static inline void set_wq_data(struct work_struct *work, 112 struct cpu_workqueue_struct *cwq) 113 { 114 unsigned long new; 115 116 BUG_ON(!work_pending(work)); 117 118 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); 119 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 120 atomic_long_set(&work->data, new); 121 } 122 123 static inline 124 struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) 125 { 126 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 127 } 128 129 DEFINE_TRACE(workqueue_insertion); 130 131 static void insert_work(struct cpu_workqueue_struct *cwq, 132 struct work_struct *work, struct list_head *head) 133 { 134 trace_workqueue_insertion(cwq->thread, work); 135 136 set_wq_data(work, cwq); 137 /* 138 * Ensure that we get the right work->data if we see the 139 * result of list_add() below, see try_to_grab_pending(). 140 */ 141 smp_wmb(); 142 list_add_tail(&work->entry, head); 143 wake_up(&cwq->more_work); 144 } 145 146 static void __queue_work(struct cpu_workqueue_struct *cwq, 147 struct work_struct *work) 148 { 149 unsigned long flags; 150 151 spin_lock_irqsave(&cwq->lock, flags); 152 insert_work(cwq, work, &cwq->worklist); 153 spin_unlock_irqrestore(&cwq->lock, flags); 154 } 155 156 /** 157 * queue_work - queue work on a workqueue 158 * @wq: workqueue to use 159 * @work: work to queue 160 * 161 * Returns 0 if @work was already on a queue, non-zero otherwise. 162 * 163 * We queue the work to the CPU on which it was submitted, but if the CPU dies 164 * it can be processed by another CPU. 165 */ 166 int queue_work(struct workqueue_struct *wq, struct work_struct *work) 167 { 168 int ret; 169 170 ret = queue_work_on(get_cpu(), wq, work); 171 put_cpu(); 172 173 return ret; 174 } 175 EXPORT_SYMBOL_GPL(queue_work); 176 177 /** 178 * queue_work_on - queue work on specific cpu 179 * @cpu: CPU number to execute work on 180 * @wq: workqueue to use 181 * @work: work to queue 182 * 183 * Returns 0 if @work was already on a queue, non-zero otherwise. 184 * 185 * We queue the work to a specific CPU, the caller must ensure it 186 * can't go away. 187 */ 188 int 189 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 190 { 191 int ret = 0; 192 193 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 194 BUG_ON(!list_empty(&work->entry)); 195 __queue_work(wq_per_cpu(wq, cpu), work); 196 ret = 1; 197 } 198 return ret; 199 } 200 EXPORT_SYMBOL_GPL(queue_work_on); 201 202 static void delayed_work_timer_fn(unsigned long __data) 203 { 204 struct delayed_work *dwork = (struct delayed_work *)__data; 205 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); 206 struct workqueue_struct *wq = cwq->wq; 207 208 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work); 209 } 210 211 /** 212 * queue_delayed_work - queue work on a workqueue after delay 213 * @wq: workqueue to use 214 * @dwork: delayable work to queue 215 * @delay: number of jiffies to wait before queueing 216 * 217 * Returns 0 if @work was already on a queue, non-zero otherwise. 218 */ 219 int queue_delayed_work(struct workqueue_struct *wq, 220 struct delayed_work *dwork, unsigned long delay) 221 { 222 if (delay == 0) 223 return queue_work(wq, &dwork->work); 224 225 return queue_delayed_work_on(-1, wq, dwork, delay); 226 } 227 EXPORT_SYMBOL_GPL(queue_delayed_work); 228 229 /** 230 * queue_delayed_work_on - queue work on specific CPU after delay 231 * @cpu: CPU number to execute work on 232 * @wq: workqueue to use 233 * @dwork: work to queue 234 * @delay: number of jiffies to wait before queueing 235 * 236 * Returns 0 if @work was already on a queue, non-zero otherwise. 237 */ 238 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 239 struct delayed_work *dwork, unsigned long delay) 240 { 241 int ret = 0; 242 struct timer_list *timer = &dwork->timer; 243 struct work_struct *work = &dwork->work; 244 245 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 246 BUG_ON(timer_pending(timer)); 247 BUG_ON(!list_empty(&work->entry)); 248 249 timer_stats_timer_set_start_info(&dwork->timer); 250 251 /* This stores cwq for the moment, for the timer_fn */ 252 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id())); 253 timer->expires = jiffies + delay; 254 timer->data = (unsigned long)dwork; 255 timer->function = delayed_work_timer_fn; 256 257 if (unlikely(cpu >= 0)) 258 add_timer_on(timer, cpu); 259 else 260 add_timer(timer); 261 ret = 1; 262 } 263 return ret; 264 } 265 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 266 267 DEFINE_TRACE(workqueue_execution); 268 269 static void run_workqueue(struct cpu_workqueue_struct *cwq) 270 { 271 spin_lock_irq(&cwq->lock); 272 cwq->run_depth++; 273 if (cwq->run_depth > 3) { 274 /* morton gets to eat his hat */ 275 printk("%s: recursion depth exceeded: %d\n", 276 __func__, cwq->run_depth); 277 dump_stack(); 278 } 279 while (!list_empty(&cwq->worklist)) { 280 struct work_struct *work = list_entry(cwq->worklist.next, 281 struct work_struct, entry); 282 work_func_t f = work->func; 283 #ifdef CONFIG_LOCKDEP 284 /* 285 * It is permissible to free the struct work_struct 286 * from inside the function that is called from it, 287 * this we need to take into account for lockdep too. 288 * To avoid bogus "held lock freed" warnings as well 289 * as problems when looking into work->lockdep_map, 290 * make a copy and use that here. 291 */ 292 struct lockdep_map lockdep_map = work->lockdep_map; 293 #endif 294 trace_workqueue_execution(cwq->thread, work); 295 cwq->current_work = work; 296 list_del_init(cwq->worklist.next); 297 spin_unlock_irq(&cwq->lock); 298 299 BUG_ON(get_wq_data(work) != cwq); 300 work_clear_pending(work); 301 lock_map_acquire(&cwq->wq->lockdep_map); 302 lock_map_acquire(&lockdep_map); 303 f(work); 304 lock_map_release(&lockdep_map); 305 lock_map_release(&cwq->wq->lockdep_map); 306 307 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 308 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 309 "%s/0x%08x/%d\n", 310 current->comm, preempt_count(), 311 task_pid_nr(current)); 312 printk(KERN_ERR " last function: "); 313 print_symbol("%s\n", (unsigned long)f); 314 debug_show_held_locks(current); 315 dump_stack(); 316 } 317 318 spin_lock_irq(&cwq->lock); 319 cwq->current_work = NULL; 320 } 321 cwq->run_depth--; 322 spin_unlock_irq(&cwq->lock); 323 } 324 325 static int worker_thread(void *__cwq) 326 { 327 struct cpu_workqueue_struct *cwq = __cwq; 328 DEFINE_WAIT(wait); 329 330 if (cwq->wq->freezeable) 331 set_freezable(); 332 333 set_user_nice(current, -5); 334 335 for (;;) { 336 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 337 if (!freezing(current) && 338 !kthread_should_stop() && 339 list_empty(&cwq->worklist)) 340 schedule(); 341 finish_wait(&cwq->more_work, &wait); 342 343 try_to_freeze(); 344 345 if (kthread_should_stop()) 346 break; 347 348 run_workqueue(cwq); 349 } 350 351 return 0; 352 } 353 354 struct wq_barrier { 355 struct work_struct work; 356 struct completion done; 357 }; 358 359 static void wq_barrier_func(struct work_struct *work) 360 { 361 struct wq_barrier *barr = container_of(work, struct wq_barrier, work); 362 complete(&barr->done); 363 } 364 365 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 366 struct wq_barrier *barr, struct list_head *head) 367 { 368 INIT_WORK(&barr->work, wq_barrier_func); 369 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); 370 371 init_completion(&barr->done); 372 373 insert_work(cwq, &barr->work, head); 374 } 375 376 static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 377 { 378 int active; 379 380 if (cwq->thread == current) { 381 /* 382 * Probably keventd trying to flush its own queue. So simply run 383 * it by hand rather than deadlocking. 384 */ 385 run_workqueue(cwq); 386 active = 1; 387 } else { 388 struct wq_barrier barr; 389 390 active = 0; 391 spin_lock_irq(&cwq->lock); 392 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { 393 insert_wq_barrier(cwq, &barr, &cwq->worklist); 394 active = 1; 395 } 396 spin_unlock_irq(&cwq->lock); 397 398 if (active) 399 wait_for_completion(&barr.done); 400 } 401 402 return active; 403 } 404 405 /** 406 * flush_workqueue - ensure that any scheduled work has run to completion. 407 * @wq: workqueue to flush 408 * 409 * Forces execution of the workqueue and blocks until its completion. 410 * This is typically used in driver shutdown handlers. 411 * 412 * We sleep until all works which were queued on entry have been handled, 413 * but we are not livelocked by new incoming ones. 414 * 415 * This function used to run the workqueues itself. Now we just wait for the 416 * helper threads to do it. 417 */ 418 void flush_workqueue(struct workqueue_struct *wq) 419 { 420 const struct cpumask *cpu_map = wq_cpu_map(wq); 421 int cpu; 422 423 might_sleep(); 424 lock_map_acquire(&wq->lockdep_map); 425 lock_map_release(&wq->lockdep_map); 426 for_each_cpu_mask_nr(cpu, *cpu_map) 427 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 428 } 429 EXPORT_SYMBOL_GPL(flush_workqueue); 430 431 /** 432 * flush_work - block until a work_struct's callback has terminated 433 * @work: the work which is to be flushed 434 * 435 * Returns false if @work has already terminated. 436 * 437 * It is expected that, prior to calling flush_work(), the caller has 438 * arranged for the work to not be requeued, otherwise it doesn't make 439 * sense to use this function. 440 */ 441 int flush_work(struct work_struct *work) 442 { 443 struct cpu_workqueue_struct *cwq; 444 struct list_head *prev; 445 struct wq_barrier barr; 446 447 might_sleep(); 448 cwq = get_wq_data(work); 449 if (!cwq) 450 return 0; 451 452 lock_map_acquire(&cwq->wq->lockdep_map); 453 lock_map_release(&cwq->wq->lockdep_map); 454 455 prev = NULL; 456 spin_lock_irq(&cwq->lock); 457 if (!list_empty(&work->entry)) { 458 /* 459 * See the comment near try_to_grab_pending()->smp_rmb(). 460 * If it was re-queued under us we are not going to wait. 461 */ 462 smp_rmb(); 463 if (unlikely(cwq != get_wq_data(work))) 464 goto out; 465 prev = &work->entry; 466 } else { 467 if (cwq->current_work != work) 468 goto out; 469 prev = &cwq->worklist; 470 } 471 insert_wq_barrier(cwq, &barr, prev->next); 472 out: 473 spin_unlock_irq(&cwq->lock); 474 if (!prev) 475 return 0; 476 477 wait_for_completion(&barr.done); 478 return 1; 479 } 480 EXPORT_SYMBOL_GPL(flush_work); 481 482 /* 483 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit, 484 * so this work can't be re-armed in any way. 485 */ 486 static int try_to_grab_pending(struct work_struct *work) 487 { 488 struct cpu_workqueue_struct *cwq; 489 int ret = -1; 490 491 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) 492 return 0; 493 494 /* 495 * The queueing is in progress, or it is already queued. Try to 496 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 497 */ 498 499 cwq = get_wq_data(work); 500 if (!cwq) 501 return ret; 502 503 spin_lock_irq(&cwq->lock); 504 if (!list_empty(&work->entry)) { 505 /* 506 * This work is queued, but perhaps we locked the wrong cwq. 507 * In that case we must see the new value after rmb(), see 508 * insert_work()->wmb(). 509 */ 510 smp_rmb(); 511 if (cwq == get_wq_data(work)) { 512 list_del_init(&work->entry); 513 ret = 1; 514 } 515 } 516 spin_unlock_irq(&cwq->lock); 517 518 return ret; 519 } 520 521 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, 522 struct work_struct *work) 523 { 524 struct wq_barrier barr; 525 int running = 0; 526 527 spin_lock_irq(&cwq->lock); 528 if (unlikely(cwq->current_work == work)) { 529 insert_wq_barrier(cwq, &barr, cwq->worklist.next); 530 running = 1; 531 } 532 spin_unlock_irq(&cwq->lock); 533 534 if (unlikely(running)) 535 wait_for_completion(&barr.done); 536 } 537 538 static void wait_on_work(struct work_struct *work) 539 { 540 struct cpu_workqueue_struct *cwq; 541 struct workqueue_struct *wq; 542 const struct cpumask *cpu_map; 543 int cpu; 544 545 might_sleep(); 546 547 lock_map_acquire(&work->lockdep_map); 548 lock_map_release(&work->lockdep_map); 549 550 cwq = get_wq_data(work); 551 if (!cwq) 552 return; 553 554 wq = cwq->wq; 555 cpu_map = wq_cpu_map(wq); 556 557 for_each_cpu_mask_nr(cpu, *cpu_map) 558 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 559 } 560 561 static int __cancel_work_timer(struct work_struct *work, 562 struct timer_list* timer) 563 { 564 int ret; 565 566 do { 567 ret = (timer && likely(del_timer(timer))); 568 if (!ret) 569 ret = try_to_grab_pending(work); 570 wait_on_work(work); 571 } while (unlikely(ret < 0)); 572 573 work_clear_pending(work); 574 return ret; 575 } 576 577 /** 578 * cancel_work_sync - block until a work_struct's callback has terminated 579 * @work: the work which is to be flushed 580 * 581 * Returns true if @work was pending. 582 * 583 * cancel_work_sync() will cancel the work if it is queued. If the work's 584 * callback appears to be running, cancel_work_sync() will block until it 585 * has completed. 586 * 587 * It is possible to use this function if the work re-queues itself. It can 588 * cancel the work even if it migrates to another workqueue, however in that 589 * case it only guarantees that work->func() has completed on the last queued 590 * workqueue. 591 * 592 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not 593 * pending, otherwise it goes into a busy-wait loop until the timer expires. 594 * 595 * The caller must ensure that workqueue_struct on which this work was last 596 * queued can't be destroyed before this function returns. 597 */ 598 int cancel_work_sync(struct work_struct *work) 599 { 600 return __cancel_work_timer(work, NULL); 601 } 602 EXPORT_SYMBOL_GPL(cancel_work_sync); 603 604 /** 605 * cancel_delayed_work_sync - reliably kill off a delayed work. 606 * @dwork: the delayed work struct 607 * 608 * Returns true if @dwork was pending. 609 * 610 * It is possible to use this function if @dwork rearms itself via queue_work() 611 * or queue_delayed_work(). See also the comment for cancel_work_sync(). 612 */ 613 int cancel_delayed_work_sync(struct delayed_work *dwork) 614 { 615 return __cancel_work_timer(&dwork->work, &dwork->timer); 616 } 617 EXPORT_SYMBOL(cancel_delayed_work_sync); 618 619 static struct workqueue_struct *keventd_wq __read_mostly; 620 621 /** 622 * schedule_work - put work task in global workqueue 623 * @work: job to be done 624 * 625 * This puts a job in the kernel-global workqueue. 626 */ 627 int schedule_work(struct work_struct *work) 628 { 629 return queue_work(keventd_wq, work); 630 } 631 EXPORT_SYMBOL(schedule_work); 632 633 /* 634 * schedule_work_on - put work task on a specific cpu 635 * @cpu: cpu to put the work task on 636 * @work: job to be done 637 * 638 * This puts a job on a specific cpu 639 */ 640 int schedule_work_on(int cpu, struct work_struct *work) 641 { 642 return queue_work_on(cpu, keventd_wq, work); 643 } 644 EXPORT_SYMBOL(schedule_work_on); 645 646 /** 647 * schedule_delayed_work - put work task in global workqueue after delay 648 * @dwork: job to be done 649 * @delay: number of jiffies to wait or 0 for immediate execution 650 * 651 * After waiting for a given time this puts a job in the kernel-global 652 * workqueue. 653 */ 654 int schedule_delayed_work(struct delayed_work *dwork, 655 unsigned long delay) 656 { 657 return queue_delayed_work(keventd_wq, dwork, delay); 658 } 659 EXPORT_SYMBOL(schedule_delayed_work); 660 661 /** 662 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 663 * @cpu: cpu to use 664 * @dwork: job to be done 665 * @delay: number of jiffies to wait 666 * 667 * After waiting for a given time this puts a job in the kernel-global 668 * workqueue on the specified CPU. 669 */ 670 int schedule_delayed_work_on(int cpu, 671 struct delayed_work *dwork, unsigned long delay) 672 { 673 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 674 } 675 EXPORT_SYMBOL(schedule_delayed_work_on); 676 677 /** 678 * schedule_on_each_cpu - call a function on each online CPU from keventd 679 * @func: the function to call 680 * 681 * Returns zero on success. 682 * Returns -ve errno on failure. 683 * 684 * schedule_on_each_cpu() is very slow. 685 */ 686 int schedule_on_each_cpu(work_func_t func) 687 { 688 int cpu; 689 struct work_struct *works; 690 691 works = alloc_percpu(struct work_struct); 692 if (!works) 693 return -ENOMEM; 694 695 get_online_cpus(); 696 for_each_online_cpu(cpu) { 697 struct work_struct *work = per_cpu_ptr(works, cpu); 698 699 INIT_WORK(work, func); 700 schedule_work_on(cpu, work); 701 } 702 for_each_online_cpu(cpu) 703 flush_work(per_cpu_ptr(works, cpu)); 704 put_online_cpus(); 705 free_percpu(works); 706 return 0; 707 } 708 709 void flush_scheduled_work(void) 710 { 711 flush_workqueue(keventd_wq); 712 } 713 EXPORT_SYMBOL(flush_scheduled_work); 714 715 /** 716 * execute_in_process_context - reliably execute the routine with user context 717 * @fn: the function to execute 718 * @ew: guaranteed storage for the execute work structure (must 719 * be available when the work executes) 720 * 721 * Executes the function immediately if process context is available, 722 * otherwise schedules the function for delayed execution. 723 * 724 * Returns: 0 - function was executed 725 * 1 - function was scheduled for execution 726 */ 727 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 728 { 729 if (!in_interrupt()) { 730 fn(&ew->work); 731 return 0; 732 } 733 734 INIT_WORK(&ew->work, fn); 735 schedule_work(&ew->work); 736 737 return 1; 738 } 739 EXPORT_SYMBOL_GPL(execute_in_process_context); 740 741 int keventd_up(void) 742 { 743 return keventd_wq != NULL; 744 } 745 746 int current_is_keventd(void) 747 { 748 struct cpu_workqueue_struct *cwq; 749 int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 750 int ret = 0; 751 752 BUG_ON(!keventd_wq); 753 754 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 755 if (current == cwq->thread) 756 ret = 1; 757 758 return ret; 759 760 } 761 762 static struct cpu_workqueue_struct * 763 init_cpu_workqueue(struct workqueue_struct *wq, int cpu) 764 { 765 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 766 767 cwq->wq = wq; 768 spin_lock_init(&cwq->lock); 769 INIT_LIST_HEAD(&cwq->worklist); 770 init_waitqueue_head(&cwq->more_work); 771 772 return cwq; 773 } 774 775 DEFINE_TRACE(workqueue_creation); 776 777 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 778 { 779 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 }; 780 struct workqueue_struct *wq = cwq->wq; 781 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d"; 782 struct task_struct *p; 783 784 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 785 /* 786 * Nobody can add the work_struct to this cwq, 787 * if (caller is __create_workqueue) 788 * nobody should see this wq 789 * else // caller is CPU_UP_PREPARE 790 * cpu is not on cpu_online_map 791 * so we can abort safely. 792 */ 793 if (IS_ERR(p)) 794 return PTR_ERR(p); 795 if (cwq->wq->rt) 796 sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m); 797 cwq->thread = p; 798 799 trace_workqueue_creation(cwq->thread, cpu); 800 801 return 0; 802 } 803 804 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 805 { 806 struct task_struct *p = cwq->thread; 807 808 if (p != NULL) { 809 if (cpu >= 0) 810 kthread_bind(p, cpu); 811 wake_up_process(p); 812 } 813 } 814 815 struct workqueue_struct *__create_workqueue_key(const char *name, 816 int singlethread, 817 int freezeable, 818 int rt, 819 struct lock_class_key *key, 820 const char *lock_name) 821 { 822 struct workqueue_struct *wq; 823 struct cpu_workqueue_struct *cwq; 824 int err = 0, cpu; 825 826 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 827 if (!wq) 828 return NULL; 829 830 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 831 if (!wq->cpu_wq) { 832 kfree(wq); 833 return NULL; 834 } 835 836 wq->name = name; 837 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 838 wq->singlethread = singlethread; 839 wq->freezeable = freezeable; 840 wq->rt = rt; 841 INIT_LIST_HEAD(&wq->list); 842 843 if (singlethread) { 844 cwq = init_cpu_workqueue(wq, singlethread_cpu); 845 err = create_workqueue_thread(cwq, singlethread_cpu); 846 start_workqueue_thread(cwq, -1); 847 } else { 848 cpu_maps_update_begin(); 849 /* 850 * We must place this wq on list even if the code below fails. 851 * cpu_down(cpu) can remove cpu from cpu_populated_map before 852 * destroy_workqueue() takes the lock, in that case we leak 853 * cwq[cpu]->thread. 854 */ 855 spin_lock(&workqueue_lock); 856 list_add(&wq->list, &workqueues); 857 spin_unlock(&workqueue_lock); 858 /* 859 * We must initialize cwqs for each possible cpu even if we 860 * are going to call destroy_workqueue() finally. Otherwise 861 * cpu_up() can hit the uninitialized cwq once we drop the 862 * lock. 863 */ 864 for_each_possible_cpu(cpu) { 865 cwq = init_cpu_workqueue(wq, cpu); 866 if (err || !cpu_online(cpu)) 867 continue; 868 err = create_workqueue_thread(cwq, cpu); 869 start_workqueue_thread(cwq, cpu); 870 } 871 cpu_maps_update_done(); 872 } 873 874 if (err) { 875 destroy_workqueue(wq); 876 wq = NULL; 877 } 878 return wq; 879 } 880 EXPORT_SYMBOL_GPL(__create_workqueue_key); 881 882 DEFINE_TRACE(workqueue_destruction); 883 884 static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) 885 { 886 /* 887 * Our caller is either destroy_workqueue() or CPU_POST_DEAD, 888 * cpu_add_remove_lock protects cwq->thread. 889 */ 890 if (cwq->thread == NULL) 891 return; 892 893 lock_map_acquire(&cwq->wq->lockdep_map); 894 lock_map_release(&cwq->wq->lockdep_map); 895 896 flush_cpu_workqueue(cwq); 897 /* 898 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty, 899 * a concurrent flush_workqueue() can insert a barrier after us. 900 * However, in that case run_workqueue() won't return and check 901 * kthread_should_stop() until it flushes all work_struct's. 902 * When ->worklist becomes empty it is safe to exit because no 903 * more work_structs can be queued on this cwq: flush_workqueue 904 * checks list_empty(), and a "normal" queue_work() can't use 905 * a dead CPU. 906 */ 907 trace_workqueue_destruction(cwq->thread); 908 kthread_stop(cwq->thread); 909 cwq->thread = NULL; 910 } 911 912 /** 913 * destroy_workqueue - safely terminate a workqueue 914 * @wq: target workqueue 915 * 916 * Safely destroy a workqueue. All work currently pending will be done first. 917 */ 918 void destroy_workqueue(struct workqueue_struct *wq) 919 { 920 const struct cpumask *cpu_map = wq_cpu_map(wq); 921 int cpu; 922 923 cpu_maps_update_begin(); 924 spin_lock(&workqueue_lock); 925 list_del(&wq->list); 926 spin_unlock(&workqueue_lock); 927 928 for_each_cpu_mask_nr(cpu, *cpu_map) 929 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); 930 cpu_maps_update_done(); 931 932 free_percpu(wq->cpu_wq); 933 kfree(wq); 934 } 935 EXPORT_SYMBOL_GPL(destroy_workqueue); 936 937 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 938 unsigned long action, 939 void *hcpu) 940 { 941 unsigned int cpu = (unsigned long)hcpu; 942 struct cpu_workqueue_struct *cwq; 943 struct workqueue_struct *wq; 944 int ret = NOTIFY_OK; 945 946 action &= ~CPU_TASKS_FROZEN; 947 948 switch (action) { 949 case CPU_UP_PREPARE: 950 cpumask_set_cpu(cpu, cpu_populated_map); 951 } 952 undo: 953 list_for_each_entry(wq, &workqueues, list) { 954 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 955 956 switch (action) { 957 case CPU_UP_PREPARE: 958 if (!create_workqueue_thread(cwq, cpu)) 959 break; 960 printk(KERN_ERR "workqueue [%s] for %i failed\n", 961 wq->name, cpu); 962 action = CPU_UP_CANCELED; 963 ret = NOTIFY_BAD; 964 goto undo; 965 966 case CPU_ONLINE: 967 start_workqueue_thread(cwq, cpu); 968 break; 969 970 case CPU_UP_CANCELED: 971 start_workqueue_thread(cwq, -1); 972 case CPU_POST_DEAD: 973 cleanup_workqueue_thread(cwq); 974 break; 975 } 976 } 977 978 switch (action) { 979 case CPU_UP_CANCELED: 980 case CPU_POST_DEAD: 981 cpumask_clear_cpu(cpu, cpu_populated_map); 982 } 983 984 return ret; 985 } 986 987 #ifdef CONFIG_SMP 988 static struct workqueue_struct *work_on_cpu_wq __read_mostly; 989 990 struct work_for_cpu { 991 struct work_struct work; 992 long (*fn)(void *); 993 void *arg; 994 long ret; 995 }; 996 997 static void do_work_for_cpu(struct work_struct *w) 998 { 999 struct work_for_cpu *wfc = container_of(w, struct work_for_cpu, work); 1000 1001 wfc->ret = wfc->fn(wfc->arg); 1002 } 1003 1004 /** 1005 * work_on_cpu - run a function in user context on a particular cpu 1006 * @cpu: the cpu to run on 1007 * @fn: the function to run 1008 * @arg: the function arg 1009 * 1010 * This will return the value @fn returns. 1011 * It is up to the caller to ensure that the cpu doesn't go offline. 1012 */ 1013 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg) 1014 { 1015 struct work_for_cpu wfc; 1016 1017 INIT_WORK(&wfc.work, do_work_for_cpu); 1018 wfc.fn = fn; 1019 wfc.arg = arg; 1020 queue_work_on(cpu, work_on_cpu_wq, &wfc.work); 1021 flush_work(&wfc.work); 1022 1023 return wfc.ret; 1024 } 1025 EXPORT_SYMBOL_GPL(work_on_cpu); 1026 #endif /* CONFIG_SMP */ 1027 1028 void __init init_workqueues(void) 1029 { 1030 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); 1031 1032 cpumask_copy(cpu_populated_map, cpu_online_mask); 1033 singlethread_cpu = cpumask_first(cpu_possible_mask); 1034 cpu_singlethread_map = cpumask_of(singlethread_cpu); 1035 hotcpu_notifier(workqueue_cpu_callback, 0); 1036 keventd_wq = create_workqueue("events"); 1037 BUG_ON(!keventd_wq); 1038 #ifdef CONFIG_SMP 1039 work_on_cpu_wq = create_workqueue("work_on_cpu"); 1040 BUG_ON(!work_on_cpu_wq); 1041 #endif 1042 } 1043