Lines Matching refs:queue
108 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, in _timeout_task_init() argument
113 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, in _timeout_task_init()
115 timeout_task->q = queue; in _timeout_task_init()
128 task_get_busy(struct taskqueue *queue, struct task *task) in task_get_busy() argument
132 TQ_ASSERT_LOCKED(queue); in task_get_busy()
133 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in task_get_busy()
145 struct taskqueue *queue; in _taskqueue_create() local
152 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); in _taskqueue_create()
153 if (queue == NULL) { in _taskqueue_create()
160 STAILQ_INIT(&queue->tq_queue); in _taskqueue_create()
161 LIST_INIT(&queue->tq_active); in _taskqueue_create()
162 queue->tq_enqueue = enqueue; in _taskqueue_create()
163 queue->tq_context = context; in _taskqueue_create()
164 queue->tq_name = tq_name; in _taskqueue_create()
165 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; in _taskqueue_create()
166 queue->tq_flags |= TQ_FLAGS_ACTIVE; in _taskqueue_create()
171 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; in _taskqueue_create()
172 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); in _taskqueue_create()
174 return (queue); in _taskqueue_create()
187 taskqueue_set_callback(struct taskqueue *queue, in taskqueue_set_callback() argument
196 KASSERT((queue->tq_callbacks[cb_type] == NULL), in taskqueue_set_callback()
199 queue->tq_callbacks[cb_type] = callback; in taskqueue_set_callback()
200 queue->tq_cb_contexts[cb_type] = context; in taskqueue_set_callback()
217 taskqueue_free(struct taskqueue *queue) in taskqueue_free() argument
220 TQ_LOCK(queue); in taskqueue_free()
221 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; in taskqueue_free()
222 taskqueue_terminate(queue->tq_threads, queue); in taskqueue_free()
223 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); in taskqueue_free()
224 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); in taskqueue_free()
225 mtx_destroy(&queue->tq_mutex); in taskqueue_free()
226 free(queue->tq_threads, M_TASKQUEUE); in taskqueue_free()
227 free(queue->tq_name, M_TASKQUEUE); in taskqueue_free()
228 free(queue, M_TASKQUEUE); in taskqueue_free()
232 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags) in taskqueue_enqueue_locked() argument
243 tb = task_get_busy(queue, task); in taskqueue_enqueue_locked()
245 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
255 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
260 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
270 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
272 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
274 prev = queue->tq_hint; in taskqueue_enqueue_locked()
279 ins = STAILQ_FIRST(&queue->tq_queue); in taskqueue_enqueue_locked()
286 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); in taskqueue_enqueue_locked()
287 queue->tq_hint = task; in taskqueue_enqueue_locked()
289 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
293 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) in taskqueue_enqueue_locked()
294 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
295 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) in taskqueue_enqueue_locked()
296 queue->tq_enqueue(queue->tq_context); in taskqueue_enqueue_locked()
297 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) in taskqueue_enqueue_locked()
298 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
305 taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags) in taskqueue_enqueue_flags() argument
309 TQ_LOCK(queue); in taskqueue_enqueue_flags()
310 res = taskqueue_enqueue_locked(queue, task, flags); in taskqueue_enqueue_flags()
317 taskqueue_enqueue(struct taskqueue *queue, struct task *task) in taskqueue_enqueue() argument
319 return (taskqueue_enqueue_flags(queue, task, 0)); in taskqueue_enqueue()
325 struct taskqueue *queue; in taskqueue_timeout_func() local
329 queue = timeout_task->q; in taskqueue_timeout_func()
332 queue->tq_callouts--; in taskqueue_timeout_func()
338 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, in taskqueue_enqueue_timeout_sbt() argument
343 TQ_LOCK(queue); in taskqueue_enqueue_timeout_sbt()
344 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, in taskqueue_enqueue_timeout_sbt()
346 timeout_task->q = queue; in taskqueue_enqueue_timeout_sbt()
350 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
353 taskqueue_enqueue_locked(queue, &timeout_task->t, 0); in taskqueue_enqueue_timeout_sbt()
359 queue->tq_callouts++; in taskqueue_enqueue_timeout_sbt()
365 if (queue->tq_spin) in taskqueue_enqueue_timeout_sbt()
367 if (queue->tq_spin && queue->tq_tcount == 1 && in taskqueue_enqueue_timeout_sbt()
368 queue->tq_threads[0] == curthread) { in taskqueue_enqueue_timeout_sbt()
376 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
382 taskqueue_enqueue_timeout(struct taskqueue *queue, in taskqueue_enqueue_timeout() argument
386 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, in taskqueue_enqueue_timeout()
401 taskqueue_drain_tq_queue(struct taskqueue *queue) in taskqueue_drain_tq_queue() argument
405 if (STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_drain_tq_queue()
417 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); in taskqueue_drain_tq_queue()
418 queue->tq_hint = &t_barrier; in taskqueue_drain_tq_queue()
426 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); in taskqueue_drain_tq_queue()
436 taskqueue_drain_tq_active(struct taskqueue *queue) in taskqueue_drain_tq_active() argument
441 if (LIST_EMPTY(&queue->tq_active)) in taskqueue_drain_tq_active()
445 queue->tq_callouts++; in taskqueue_drain_tq_active()
448 seq = queue->tq_seq; in taskqueue_drain_tq_active()
450 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in taskqueue_drain_tq_active()
452 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); in taskqueue_drain_tq_active()
458 queue->tq_callouts--; in taskqueue_drain_tq_active()
459 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) in taskqueue_drain_tq_active()
460 wakeup_one(queue->tq_threads); in taskqueue_drain_tq_active()
465 taskqueue_block(struct taskqueue *queue) in taskqueue_block() argument
468 TQ_LOCK(queue); in taskqueue_block()
469 queue->tq_flags |= TQ_FLAGS_BLOCKED; in taskqueue_block()
470 TQ_UNLOCK(queue); in taskqueue_block()
474 taskqueue_unblock(struct taskqueue *queue) in taskqueue_unblock() argument
477 TQ_LOCK(queue); in taskqueue_unblock()
478 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; in taskqueue_unblock()
479 if (!STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_unblock()
480 queue->tq_enqueue(queue->tq_context); in taskqueue_unblock()
481 TQ_UNLOCK(queue); in taskqueue_unblock()
485 taskqueue_run_locked(struct taskqueue *queue) in taskqueue_run_locked() argument
493 KASSERT(queue != NULL, ("tq is NULL")); in taskqueue_run_locked()
494 TQ_ASSERT_LOCKED(queue); in taskqueue_run_locked()
496 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); in taskqueue_run_locked()
499 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { in taskqueue_run_locked()
500 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); in taskqueue_run_locked()
501 if (queue->tq_hint == task) in taskqueue_run_locked()
502 queue->tq_hint = NULL; in taskqueue_run_locked()
506 tb.tb_seq = ++queue->tq_seq; in taskqueue_run_locked()
508 TQ_UNLOCK(queue); in taskqueue_run_locked()
520 TQ_LOCK(queue); in taskqueue_run_locked()
529 taskqueue_run(struct taskqueue *queue) in taskqueue_run() argument
532 TQ_LOCK(queue); in taskqueue_run()
533 taskqueue_run_locked(queue); in taskqueue_run()
534 TQ_UNLOCK(queue); in taskqueue_run()
543 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) in taskqueue_poll_is_busy() argument
547 TQ_LOCK(queue); in taskqueue_poll_is_busy()
548 retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL; in taskqueue_poll_is_busy()
549 TQ_UNLOCK(queue); in taskqueue_poll_is_busy()
555 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, in taskqueue_cancel_locked() argument
562 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); in taskqueue_cancel_locked()
563 if (queue->tq_hint == task) in taskqueue_cancel_locked()
564 queue->tq_hint = NULL; in taskqueue_cancel_locked()
569 tb = task_get_busy(queue, task); in taskqueue_cancel_locked()
579 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) in taskqueue_cancel() argument
583 TQ_LOCK(queue); in taskqueue_cancel()
584 error = taskqueue_cancel_locked(queue, task, pendp); in taskqueue_cancel()
585 TQ_UNLOCK(queue); in taskqueue_cancel()
591 taskqueue_cancel_timeout(struct taskqueue *queue, in taskqueue_cancel_timeout() argument
597 TQ_LOCK(queue); in taskqueue_cancel_timeout()
599 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); in taskqueue_cancel_timeout()
602 queue->tq_callouts--; in taskqueue_cancel_timeout()
604 TQ_UNLOCK(queue); in taskqueue_cancel_timeout()
612 taskqueue_drain(struct taskqueue *queue, struct task *task) in taskqueue_drain() argument
615 if (!queue->tq_spin) in taskqueue_drain()
618 TQ_LOCK(queue); in taskqueue_drain()
619 while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL) in taskqueue_drain()
620 TQ_SLEEP(queue, task, "tq_drain"); in taskqueue_drain()
621 TQ_UNLOCK(queue); in taskqueue_drain()
625 taskqueue_drain_all(struct taskqueue *queue) in taskqueue_drain_all() argument
628 if (!queue->tq_spin) in taskqueue_drain_all()
631 TQ_LOCK(queue); in taskqueue_drain_all()
632 (void)taskqueue_drain_tq_queue(queue); in taskqueue_drain_all()
633 (void)taskqueue_drain_tq_active(queue); in taskqueue_drain_all()
634 TQ_UNLOCK(queue); in taskqueue_drain_all()
638 taskqueue_drain_timeout(struct taskqueue *queue, in taskqueue_drain_timeout() argument
645 TQ_LOCK(queue); in taskqueue_drain_timeout()
649 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
652 taskqueue_drain(queue, &timeout_task->t); in taskqueue_drain_timeout()
657 TQ_LOCK(queue); in taskqueue_drain_timeout()
659 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
663 taskqueue_quiesce(struct taskqueue *queue) in taskqueue_quiesce() argument
667 TQ_LOCK(queue); in taskqueue_quiesce()
669 ret = taskqueue_drain_tq_queue(queue); in taskqueue_quiesce()
671 ret = taskqueue_drain_tq_active(queue); in taskqueue_quiesce()
673 TQ_UNLOCK(queue); in taskqueue_quiesce()
904 taskqueue_member(struct taskqueue *queue, struct thread *td) in taskqueue_member() argument
909 if (queue->tq_threads[i] == NULL) in taskqueue_member()
911 if (queue->tq_threads[i] == td) { in taskqueue_member()
915 if (++j >= queue->tq_tcount) in taskqueue_member()