Lines Matching refs:queue
107 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, in _timeout_task_init() argument
112 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, in _timeout_task_init()
114 timeout_task->q = queue; in _timeout_task_init()
132 struct taskqueue *queue; in _taskqueue_create() local
139 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); in _taskqueue_create()
140 if (queue == NULL) { in _taskqueue_create()
147 STAILQ_INIT(&queue->tq_queue); in _taskqueue_create()
148 TAILQ_INIT(&queue->tq_active); in _taskqueue_create()
149 queue->tq_enqueue = enqueue; in _taskqueue_create()
150 queue->tq_context = context; in _taskqueue_create()
151 queue->tq_name = tq_name; in _taskqueue_create()
152 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; in _taskqueue_create()
153 queue->tq_flags |= TQ_FLAGS_ACTIVE; in _taskqueue_create()
158 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; in _taskqueue_create()
159 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); in _taskqueue_create()
161 return (queue); in _taskqueue_create()
174 taskqueue_set_callback(struct taskqueue *queue, in taskqueue_set_callback() argument
183 KASSERT((queue->tq_callbacks[cb_type] == NULL), in taskqueue_set_callback()
186 queue->tq_callbacks[cb_type] = callback; in taskqueue_set_callback()
187 queue->tq_cb_contexts[cb_type] = context; in taskqueue_set_callback()
204 taskqueue_free(struct taskqueue *queue) in taskqueue_free() argument
207 TQ_LOCK(queue); in taskqueue_free()
208 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; in taskqueue_free()
209 taskqueue_terminate(queue->tq_threads, queue); in taskqueue_free()
210 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); in taskqueue_free()
211 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); in taskqueue_free()
212 mtx_destroy(&queue->tq_mutex); in taskqueue_free()
213 free(queue->tq_threads, M_TASKQUEUE); in taskqueue_free()
214 free(queue->tq_name, M_TASKQUEUE); in taskqueue_free()
215 free(queue, M_TASKQUEUE); in taskqueue_free()
219 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) in taskqueue_enqueue_locked() argument
231 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
238 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
240 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
243 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; in taskqueue_enqueue_locked()
249 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); in taskqueue_enqueue_locked()
251 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
255 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) in taskqueue_enqueue_locked()
256 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
257 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) in taskqueue_enqueue_locked()
258 queue->tq_enqueue(queue->tq_context); in taskqueue_enqueue_locked()
259 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) in taskqueue_enqueue_locked()
260 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
267 taskqueue_enqueue(struct taskqueue *queue, struct task *task) in taskqueue_enqueue() argument
271 TQ_LOCK(queue); in taskqueue_enqueue()
272 res = taskqueue_enqueue_locked(queue, task); in taskqueue_enqueue()
281 struct taskqueue *queue; in taskqueue_timeout_func() local
285 queue = timeout_task->q; in taskqueue_timeout_func()
288 queue->tq_callouts--; in taskqueue_timeout_func()
294 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, in taskqueue_enqueue_timeout_sbt() argument
299 TQ_LOCK(queue); in taskqueue_enqueue_timeout_sbt()
300 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, in taskqueue_enqueue_timeout_sbt()
302 KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); in taskqueue_enqueue_timeout_sbt()
303 timeout_task->q = queue; in taskqueue_enqueue_timeout_sbt()
307 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
310 taskqueue_enqueue_locked(queue, &timeout_task->t); in taskqueue_enqueue_timeout_sbt()
316 queue->tq_callouts++; in taskqueue_enqueue_timeout_sbt()
325 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
331 taskqueue_enqueue_timeout(struct taskqueue *queue, in taskqueue_enqueue_timeout() argument
335 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, in taskqueue_enqueue_timeout()
350 taskqueue_drain_tq_queue(struct taskqueue *queue) in taskqueue_drain_tq_queue() argument
354 if (STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_drain_tq_queue()
366 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); in taskqueue_drain_tq_queue()
374 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); in taskqueue_drain_tq_queue()
384 taskqueue_drain_tq_active(struct taskqueue *queue) in taskqueue_drain_tq_active() argument
388 if (TAILQ_EMPTY(&queue->tq_active)) in taskqueue_drain_tq_active()
392 queue->tq_callouts++; in taskqueue_drain_tq_active()
399 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); in taskqueue_drain_tq_active()
400 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) in taskqueue_drain_tq_active()
401 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); in taskqueue_drain_tq_active()
402 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); in taskqueue_drain_tq_active()
408 tb_first = TAILQ_FIRST(&queue->tq_active); in taskqueue_drain_tq_active()
413 queue->tq_callouts--; in taskqueue_drain_tq_active()
414 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) in taskqueue_drain_tq_active()
415 wakeup_one(queue->tq_threads); in taskqueue_drain_tq_active()
420 taskqueue_block(struct taskqueue *queue) in taskqueue_block() argument
423 TQ_LOCK(queue); in taskqueue_block()
424 queue->tq_flags |= TQ_FLAGS_BLOCKED; in taskqueue_block()
425 TQ_UNLOCK(queue); in taskqueue_block()
429 taskqueue_unblock(struct taskqueue *queue) in taskqueue_unblock() argument
432 TQ_LOCK(queue); in taskqueue_unblock()
433 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; in taskqueue_unblock()
434 if (!STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_unblock()
435 queue->tq_enqueue(queue->tq_context); in taskqueue_unblock()
436 TQ_UNLOCK(queue); in taskqueue_unblock()
440 taskqueue_run_locked(struct taskqueue *queue) in taskqueue_run_locked() argument
447 KASSERT(queue != NULL, ("tq is NULL")); in taskqueue_run_locked()
448 TQ_ASSERT_LOCKED(queue); in taskqueue_run_locked()
451 while (STAILQ_FIRST(&queue->tq_queue)) { in taskqueue_run_locked()
452 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); in taskqueue_run_locked()
458 task = STAILQ_FIRST(&queue->tq_queue); in taskqueue_run_locked()
460 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); in taskqueue_run_locked()
464 TQ_UNLOCK(queue); in taskqueue_run_locked()
469 TQ_LOCK(queue); in taskqueue_run_locked()
473 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); in taskqueue_run_locked()
474 tb_first = TAILQ_FIRST(&queue->tq_active); in taskqueue_run_locked()
482 taskqueue_run(struct taskqueue *queue) in taskqueue_run() argument
485 TQ_LOCK(queue); in taskqueue_run()
486 taskqueue_run_locked(queue); in taskqueue_run()
487 TQ_UNLOCK(queue); in taskqueue_run()
491 task_is_running(struct taskqueue *queue, struct task *task) in task_is_running() argument
495 TQ_ASSERT_LOCKED(queue); in task_is_running()
496 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { in task_is_running()
509 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) in taskqueue_poll_is_busy() argument
513 TQ_LOCK(queue); in taskqueue_poll_is_busy()
514 retval = task->ta_pending > 0 || task_is_running(queue, task); in taskqueue_poll_is_busy()
515 TQ_UNLOCK(queue); in taskqueue_poll_is_busy()
521 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, in taskqueue_cancel_locked() argument
526 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); in taskqueue_cancel_locked()
530 return (task_is_running(queue, task) ? EBUSY : 0); in taskqueue_cancel_locked()
534 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) in taskqueue_cancel() argument
538 TQ_LOCK(queue); in taskqueue_cancel()
539 error = taskqueue_cancel_locked(queue, task, pendp); in taskqueue_cancel()
540 TQ_UNLOCK(queue); in taskqueue_cancel()
546 taskqueue_cancel_timeout(struct taskqueue *queue, in taskqueue_cancel_timeout() argument
552 TQ_LOCK(queue); in taskqueue_cancel_timeout()
554 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); in taskqueue_cancel_timeout()
557 queue->tq_callouts--; in taskqueue_cancel_timeout()
559 TQ_UNLOCK(queue); in taskqueue_cancel_timeout()
567 taskqueue_drain(struct taskqueue *queue, struct task *task) in taskqueue_drain() argument
570 if (!queue->tq_spin) in taskqueue_drain()
573 TQ_LOCK(queue); in taskqueue_drain()
574 while (task->ta_pending != 0 || task_is_running(queue, task)) in taskqueue_drain()
575 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); in taskqueue_drain()
576 TQ_UNLOCK(queue); in taskqueue_drain()
580 taskqueue_drain_all(struct taskqueue *queue) in taskqueue_drain_all() argument
583 if (!queue->tq_spin) in taskqueue_drain_all()
586 TQ_LOCK(queue); in taskqueue_drain_all()
587 (void)taskqueue_drain_tq_queue(queue); in taskqueue_drain_all()
588 (void)taskqueue_drain_tq_active(queue); in taskqueue_drain_all()
589 TQ_UNLOCK(queue); in taskqueue_drain_all()
593 taskqueue_drain_timeout(struct taskqueue *queue, in taskqueue_drain_timeout() argument
600 TQ_LOCK(queue); in taskqueue_drain_timeout()
604 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
607 taskqueue_drain(queue, &timeout_task->t); in taskqueue_drain_timeout()
612 TQ_LOCK(queue); in taskqueue_drain_timeout()
614 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
618 taskqueue_quiesce(struct taskqueue *queue) in taskqueue_quiesce() argument
622 TQ_LOCK(queue); in taskqueue_quiesce()
624 ret = taskqueue_drain_tq_queue(queue); in taskqueue_quiesce()
626 ret = taskqueue_drain_tq_active(queue); in taskqueue_quiesce()
628 TQ_UNLOCK(queue); in taskqueue_quiesce()
847 taskqueue_member(struct taskqueue *queue, struct thread *td) in taskqueue_member() argument
852 if (queue->tq_threads[i] == NULL) in taskqueue_member()
854 if (queue->tq_threads[i] == td) { in taskqueue_member()
858 if (++j >= queue->tq_tcount) in taskqueue_member()