Lines Matching refs:queue

109 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,  in _timeout_task_init()  argument
114 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, in _timeout_task_init()
116 timeout_task->q = queue; in _timeout_task_init()
133 struct taskqueue *queue; in _taskqueue_create() local
140 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); in _taskqueue_create()
141 if (queue == NULL) { in _taskqueue_create()
148 STAILQ_INIT(&queue->tq_queue); in _taskqueue_create()
149 LIST_INIT(&queue->tq_active); in _taskqueue_create()
150 queue->tq_enqueue = enqueue; in _taskqueue_create()
151 queue->tq_context = context; in _taskqueue_create()
152 queue->tq_name = tq_name; in _taskqueue_create()
153 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; in _taskqueue_create()
154 queue->tq_flags |= TQ_FLAGS_ACTIVE; in _taskqueue_create()
159 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; in _taskqueue_create()
160 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); in _taskqueue_create()
162 return (queue); in _taskqueue_create()
175 taskqueue_set_callback(struct taskqueue *queue, in taskqueue_set_callback() argument
184 KASSERT((queue->tq_callbacks[cb_type] == NULL), in taskqueue_set_callback()
187 queue->tq_callbacks[cb_type] = callback; in taskqueue_set_callback()
188 queue->tq_cb_contexts[cb_type] = context; in taskqueue_set_callback()
205 taskqueue_free(struct taskqueue *queue) in taskqueue_free() argument
208 TQ_LOCK(queue); in taskqueue_free()
209 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; in taskqueue_free()
210 taskqueue_terminate(queue->tq_threads, queue); in taskqueue_free()
211 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); in taskqueue_free()
212 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); in taskqueue_free()
213 mtx_destroy(&queue->tq_mutex); in taskqueue_free()
214 free(queue->tq_threads, M_TASKQUEUE); in taskqueue_free()
215 free(queue->tq_name, M_TASKQUEUE); in taskqueue_free()
216 free(queue, M_TASKQUEUE); in taskqueue_free()
220 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) in taskqueue_enqueue_locked() argument
232 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
242 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
244 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
246 prev = queue->tq_hint; in taskqueue_enqueue_locked()
251 ins = STAILQ_FIRST(&queue->tq_queue); in taskqueue_enqueue_locked()
258 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); in taskqueue_enqueue_locked()
259 queue->tq_hint = task; in taskqueue_enqueue_locked()
261 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
265 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) in taskqueue_enqueue_locked()
266 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
267 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) in taskqueue_enqueue_locked()
268 queue->tq_enqueue(queue->tq_context); in taskqueue_enqueue_locked()
269 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) in taskqueue_enqueue_locked()
270 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
277 taskqueue_enqueue(struct taskqueue *queue, struct task *task) in taskqueue_enqueue() argument
281 TQ_LOCK(queue); in taskqueue_enqueue()
282 res = taskqueue_enqueue_locked(queue, task); in taskqueue_enqueue()
291 struct taskqueue *queue; in taskqueue_timeout_func() local
295 queue = timeout_task->q; in taskqueue_timeout_func()
298 queue->tq_callouts--; in taskqueue_timeout_func()
304 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, in taskqueue_enqueue_timeout_sbt() argument
309 TQ_LOCK(queue); in taskqueue_enqueue_timeout_sbt()
310 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, in taskqueue_enqueue_timeout_sbt()
312 timeout_task->q = queue; in taskqueue_enqueue_timeout_sbt()
316 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
319 taskqueue_enqueue_locked(queue, &timeout_task->t); in taskqueue_enqueue_timeout_sbt()
325 queue->tq_callouts++; in taskqueue_enqueue_timeout_sbt()
331 if (queue->tq_spin) in taskqueue_enqueue_timeout_sbt()
336 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
342 taskqueue_enqueue_timeout(struct taskqueue *queue, in taskqueue_enqueue_timeout() argument
346 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, in taskqueue_enqueue_timeout()
361 taskqueue_drain_tq_queue(struct taskqueue *queue) in taskqueue_drain_tq_queue() argument
365 if (STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_drain_tq_queue()
377 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); in taskqueue_drain_tq_queue()
378 queue->tq_hint = &t_barrier; in taskqueue_drain_tq_queue()
386 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); in taskqueue_drain_tq_queue()
396 taskqueue_drain_tq_active(struct taskqueue *queue) in taskqueue_drain_tq_active() argument
401 if (LIST_EMPTY(&queue->tq_active)) in taskqueue_drain_tq_active()
405 queue->tq_callouts++; in taskqueue_drain_tq_active()
408 seq = queue->tq_seq; in taskqueue_drain_tq_active()
410 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in taskqueue_drain_tq_active()
412 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); in taskqueue_drain_tq_active()
418 queue->tq_callouts--; in taskqueue_drain_tq_active()
419 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) in taskqueue_drain_tq_active()
420 wakeup_one(queue->tq_threads); in taskqueue_drain_tq_active()
425 taskqueue_block(struct taskqueue *queue) in taskqueue_block() argument
428 TQ_LOCK(queue); in taskqueue_block()
429 queue->tq_flags |= TQ_FLAGS_BLOCKED; in taskqueue_block()
430 TQ_UNLOCK(queue); in taskqueue_block()
434 taskqueue_unblock(struct taskqueue *queue) in taskqueue_unblock() argument
437 TQ_LOCK(queue); in taskqueue_unblock()
438 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; in taskqueue_unblock()
439 if (!STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_unblock()
440 queue->tq_enqueue(queue->tq_context); in taskqueue_unblock()
441 TQ_UNLOCK(queue); in taskqueue_unblock()
445 taskqueue_run_locked(struct taskqueue *queue) in taskqueue_run_locked() argument
453 KASSERT(queue != NULL, ("tq is NULL")); in taskqueue_run_locked()
454 TQ_ASSERT_LOCKED(queue); in taskqueue_run_locked()
456 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); in taskqueue_run_locked()
459 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { in taskqueue_run_locked()
460 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); in taskqueue_run_locked()
461 if (queue->tq_hint == task) in taskqueue_run_locked()
462 queue->tq_hint = NULL; in taskqueue_run_locked()
466 tb.tb_seq = ++queue->tq_seq; in taskqueue_run_locked()
467 TQ_UNLOCK(queue); in taskqueue_run_locked()
479 TQ_LOCK(queue); in taskqueue_run_locked()
488 taskqueue_run(struct taskqueue *queue) in taskqueue_run() argument
491 TQ_LOCK(queue); in taskqueue_run()
492 taskqueue_run_locked(queue); in taskqueue_run()
493 TQ_UNLOCK(queue); in taskqueue_run()
497 task_is_running(struct taskqueue *queue, struct task *task) in task_is_running() argument
501 TQ_ASSERT_LOCKED(queue); in task_is_running()
502 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in task_is_running()
515 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) in taskqueue_poll_is_busy() argument
519 TQ_LOCK(queue); in taskqueue_poll_is_busy()
520 retval = task->ta_pending > 0 || task_is_running(queue, task); in taskqueue_poll_is_busy()
521 TQ_UNLOCK(queue); in taskqueue_poll_is_busy()
527 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, in taskqueue_cancel_locked() argument
532 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); in taskqueue_cancel_locked()
533 if (queue->tq_hint == task) in taskqueue_cancel_locked()
534 queue->tq_hint = NULL; in taskqueue_cancel_locked()
539 return (task_is_running(queue, task) ? EBUSY : 0); in taskqueue_cancel_locked()
543 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) in taskqueue_cancel() argument
547 TQ_LOCK(queue); in taskqueue_cancel()
548 error = taskqueue_cancel_locked(queue, task, pendp); in taskqueue_cancel()
549 TQ_UNLOCK(queue); in taskqueue_cancel()
555 taskqueue_cancel_timeout(struct taskqueue *queue, in taskqueue_cancel_timeout() argument
561 TQ_LOCK(queue); in taskqueue_cancel_timeout()
563 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); in taskqueue_cancel_timeout()
566 queue->tq_callouts--; in taskqueue_cancel_timeout()
568 TQ_UNLOCK(queue); in taskqueue_cancel_timeout()
576 taskqueue_drain(struct taskqueue *queue, struct task *task) in taskqueue_drain() argument
579 if (!queue->tq_spin) in taskqueue_drain()
582 TQ_LOCK(queue); in taskqueue_drain()
583 while (task->ta_pending != 0 || task_is_running(queue, task)) in taskqueue_drain()
584 TQ_SLEEP(queue, task, "tq_drain"); in taskqueue_drain()
585 TQ_UNLOCK(queue); in taskqueue_drain()
589 taskqueue_drain_all(struct taskqueue *queue) in taskqueue_drain_all() argument
592 if (!queue->tq_spin) in taskqueue_drain_all()
595 TQ_LOCK(queue); in taskqueue_drain_all()
596 (void)taskqueue_drain_tq_queue(queue); in taskqueue_drain_all()
597 (void)taskqueue_drain_tq_active(queue); in taskqueue_drain_all()
598 TQ_UNLOCK(queue); in taskqueue_drain_all()
602 taskqueue_drain_timeout(struct taskqueue *queue, in taskqueue_drain_timeout() argument
609 TQ_LOCK(queue); in taskqueue_drain_timeout()
613 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
616 taskqueue_drain(queue, &timeout_task->t); in taskqueue_drain_timeout()
621 TQ_LOCK(queue); in taskqueue_drain_timeout()
623 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
627 taskqueue_quiesce(struct taskqueue *queue) in taskqueue_quiesce() argument
631 TQ_LOCK(queue); in taskqueue_quiesce()
633 ret = taskqueue_drain_tq_queue(queue); in taskqueue_quiesce()
635 ret = taskqueue_drain_tq_active(queue); in taskqueue_quiesce()
637 TQ_UNLOCK(queue); in taskqueue_quiesce()
868 taskqueue_member(struct taskqueue *queue, struct thread *td) in taskqueue_member() argument
873 if (queue->tq_threads[i] == NULL) in taskqueue_member()
875 if (queue->tq_threads[i] == td) { in taskqueue_member()
879 if (++j >= queue->tq_tcount) in taskqueue_member()