Lines Matching refs:queue

109 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,  in _timeout_task_init()  argument
114 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, in _timeout_task_init()
116 timeout_task->q = queue; in _timeout_task_init()
137 struct taskqueue *queue; in _taskqueue_create() local
144 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); in _taskqueue_create()
145 if (queue == NULL) { in _taskqueue_create()
152 STAILQ_INIT(&queue->tq_queue); in _taskqueue_create()
153 LIST_INIT(&queue->tq_active); in _taskqueue_create()
154 queue->tq_enqueue = enqueue; in _taskqueue_create()
155 queue->tq_context = context; in _taskqueue_create()
156 queue->tq_name = tq_name; in _taskqueue_create()
157 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; in _taskqueue_create()
158 queue->tq_flags |= TQ_FLAGS_ACTIVE; in _taskqueue_create()
163 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; in _taskqueue_create()
164 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); in _taskqueue_create()
166 return (queue); in _taskqueue_create()
179 taskqueue_set_callback(struct taskqueue *queue, in taskqueue_set_callback() argument
188 KASSERT((queue->tq_callbacks[cb_type] == NULL), in taskqueue_set_callback()
191 queue->tq_callbacks[cb_type] = callback; in taskqueue_set_callback()
192 queue->tq_cb_contexts[cb_type] = context; in taskqueue_set_callback()
209 taskqueue_free(struct taskqueue *queue) in taskqueue_free() argument
212 TQ_LOCK(queue); in taskqueue_free()
213 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; in taskqueue_free()
214 taskqueue_terminate(queue->tq_threads, queue); in taskqueue_free()
215 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); in taskqueue_free()
216 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); in taskqueue_free()
217 mtx_destroy(&queue->tq_mutex); in taskqueue_free()
218 free(queue->tq_threads, M_TASKQUEUE); in taskqueue_free()
219 free(queue->tq_name, M_TASKQUEUE); in taskqueue_free()
220 free(queue, M_TASKQUEUE); in taskqueue_free()
224 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) in taskqueue_enqueue_locked() argument
236 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
246 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
248 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
250 prev = queue->tq_hint; in taskqueue_enqueue_locked()
255 ins = STAILQ_FIRST(&queue->tq_queue); in taskqueue_enqueue_locked()
262 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); in taskqueue_enqueue_locked()
263 queue->tq_hint = task; in taskqueue_enqueue_locked()
265 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
269 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) in taskqueue_enqueue_locked()
270 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
271 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) in taskqueue_enqueue_locked()
272 queue->tq_enqueue(queue->tq_context); in taskqueue_enqueue_locked()
273 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) in taskqueue_enqueue_locked()
274 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
281 taskqueue_enqueue(struct taskqueue *queue, struct task *task) in taskqueue_enqueue() argument
285 TQ_LOCK(queue); in taskqueue_enqueue()
286 res = taskqueue_enqueue_locked(queue, task); in taskqueue_enqueue()
295 struct taskqueue *queue; in taskqueue_timeout_func() local
299 queue = timeout_task->q; in taskqueue_timeout_func()
302 queue->tq_callouts--; in taskqueue_timeout_func()
308 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, in taskqueue_enqueue_timeout_sbt() argument
313 TQ_LOCK(queue); in taskqueue_enqueue_timeout_sbt()
314 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, in taskqueue_enqueue_timeout_sbt()
316 KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); in taskqueue_enqueue_timeout_sbt()
317 timeout_task->q = queue; in taskqueue_enqueue_timeout_sbt()
321 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
324 taskqueue_enqueue_locked(queue, &timeout_task->t); in taskqueue_enqueue_timeout_sbt()
330 queue->tq_callouts++; in taskqueue_enqueue_timeout_sbt()
339 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
345 taskqueue_enqueue_timeout(struct taskqueue *queue, in taskqueue_enqueue_timeout() argument
349 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, in taskqueue_enqueue_timeout()
364 taskqueue_drain_tq_queue(struct taskqueue *queue) in taskqueue_drain_tq_queue() argument
368 if (STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_drain_tq_queue()
380 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); in taskqueue_drain_tq_queue()
381 queue->tq_hint = &t_barrier; in taskqueue_drain_tq_queue()
389 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); in taskqueue_drain_tq_queue()
399 taskqueue_drain_tq_active(struct taskqueue *queue) in taskqueue_drain_tq_active() argument
404 if (LIST_EMPTY(&queue->tq_active)) in taskqueue_drain_tq_active()
408 queue->tq_callouts++; in taskqueue_drain_tq_active()
411 seq = queue->tq_seq; in taskqueue_drain_tq_active()
413 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in taskqueue_drain_tq_active()
415 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); in taskqueue_drain_tq_active()
421 queue->tq_callouts--; in taskqueue_drain_tq_active()
422 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) in taskqueue_drain_tq_active()
423 wakeup_one(queue->tq_threads); in taskqueue_drain_tq_active()
428 taskqueue_block(struct taskqueue *queue) in taskqueue_block() argument
431 TQ_LOCK(queue); in taskqueue_block()
432 queue->tq_flags |= TQ_FLAGS_BLOCKED; in taskqueue_block()
433 TQ_UNLOCK(queue); in taskqueue_block()
437 taskqueue_unblock(struct taskqueue *queue) in taskqueue_unblock() argument
440 TQ_LOCK(queue); in taskqueue_unblock()
441 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; in taskqueue_unblock()
442 if (!STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_unblock()
443 queue->tq_enqueue(queue->tq_context); in taskqueue_unblock()
444 TQ_UNLOCK(queue); in taskqueue_unblock()
448 taskqueue_run_locked(struct taskqueue *queue) in taskqueue_run_locked() argument
456 KASSERT(queue != NULL, ("tq is NULL")); in taskqueue_run_locked()
457 TQ_ASSERT_LOCKED(queue); in taskqueue_run_locked()
459 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); in taskqueue_run_locked()
462 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { in taskqueue_run_locked()
463 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); in taskqueue_run_locked()
464 if (queue->tq_hint == task) in taskqueue_run_locked()
465 queue->tq_hint = NULL; in taskqueue_run_locked()
469 tb.tb_seq = ++queue->tq_seq; in taskqueue_run_locked()
470 TQ_UNLOCK(queue); in taskqueue_run_locked()
482 TQ_LOCK(queue); in taskqueue_run_locked()
491 taskqueue_run(struct taskqueue *queue) in taskqueue_run() argument
494 TQ_LOCK(queue); in taskqueue_run()
495 taskqueue_run_locked(queue); in taskqueue_run()
496 TQ_UNLOCK(queue); in taskqueue_run()
500 task_is_running(struct taskqueue *queue, struct task *task) in task_is_running() argument
504 TQ_ASSERT_LOCKED(queue); in task_is_running()
505 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in task_is_running()
518 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) in taskqueue_poll_is_busy() argument
522 TQ_LOCK(queue); in taskqueue_poll_is_busy()
523 retval = task->ta_pending > 0 || task_is_running(queue, task); in taskqueue_poll_is_busy()
524 TQ_UNLOCK(queue); in taskqueue_poll_is_busy()
530 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, in taskqueue_cancel_locked() argument
535 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); in taskqueue_cancel_locked()
536 if (queue->tq_hint == task) in taskqueue_cancel_locked()
537 queue->tq_hint = NULL; in taskqueue_cancel_locked()
542 return (task_is_running(queue, task) ? EBUSY : 0); in taskqueue_cancel_locked()
546 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) in taskqueue_cancel() argument
550 TQ_LOCK(queue); in taskqueue_cancel()
551 error = taskqueue_cancel_locked(queue, task, pendp); in taskqueue_cancel()
552 TQ_UNLOCK(queue); in taskqueue_cancel()
558 taskqueue_cancel_timeout(struct taskqueue *queue, in taskqueue_cancel_timeout() argument
564 TQ_LOCK(queue); in taskqueue_cancel_timeout()
566 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); in taskqueue_cancel_timeout()
569 queue->tq_callouts--; in taskqueue_cancel_timeout()
571 TQ_UNLOCK(queue); in taskqueue_cancel_timeout()
579 taskqueue_drain(struct taskqueue *queue, struct task *task) in taskqueue_drain() argument
582 if (!queue->tq_spin) in taskqueue_drain()
585 TQ_LOCK(queue); in taskqueue_drain()
586 while (task->ta_pending != 0 || task_is_running(queue, task)) in taskqueue_drain()
587 TQ_SLEEP(queue, task, "tq_drain"); in taskqueue_drain()
588 TQ_UNLOCK(queue); in taskqueue_drain()
592 taskqueue_drain_all(struct taskqueue *queue) in taskqueue_drain_all() argument
595 if (!queue->tq_spin) in taskqueue_drain_all()
598 TQ_LOCK(queue); in taskqueue_drain_all()
599 (void)taskqueue_drain_tq_queue(queue); in taskqueue_drain_all()
600 (void)taskqueue_drain_tq_active(queue); in taskqueue_drain_all()
601 TQ_UNLOCK(queue); in taskqueue_drain_all()
605 taskqueue_drain_timeout(struct taskqueue *queue, in taskqueue_drain_timeout() argument
612 TQ_LOCK(queue); in taskqueue_drain_timeout()
616 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
619 taskqueue_drain(queue, &timeout_task->t); in taskqueue_drain_timeout()
624 TQ_LOCK(queue); in taskqueue_drain_timeout()
626 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
630 taskqueue_quiesce(struct taskqueue *queue) in taskqueue_quiesce() argument
634 TQ_LOCK(queue); in taskqueue_quiesce()
636 ret = taskqueue_drain_tq_queue(queue); in taskqueue_quiesce()
638 ret = taskqueue_drain_tq_active(queue); in taskqueue_quiesce()
640 TQ_UNLOCK(queue); in taskqueue_quiesce()
885 taskqueue_member(struct taskqueue *queue, struct thread *td) in taskqueue_member() argument
890 if (queue->tq_threads[i] == NULL) in taskqueue_member()
892 if (queue->tq_threads[i] == td) { in taskqueue_member()
896 if (++j >= queue->tq_tcount) in taskqueue_member()