diff --git a/fs/io-wq.c b/fs/io-wq.c index 5facea5ac056..636461142f6f 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -34,6 +34,7 @@ enum { enum { IO_WQ_BIT_EXIT = 0, /* wq exiting */ IO_WQ_BIT_CANCEL = 1, /* cancel work on list */ + IO_WQ_BIT_ERROR = 2, /* error on setup */ }; enum { @@ -574,14 +575,14 @@ void io_wq_worker_sleeping(struct task_struct *tsk) spin_unlock_irq(&wqe->lock); } -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) { struct io_wqe_acct *acct =&wqe->acct[index]; struct io_worker *worker; worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node); if (!worker) - return; + return false; refcount_set(&worker->ref, 1); worker->nulls_node.pprev = NULL; @@ -592,7 +593,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) "io_wqe_worker-%d/%d", index, wqe->node); if (IS_ERR(worker->task)) { kfree(worker); - return; + return false; } spin_lock_irq(&wqe->lock); @@ -609,6 +610,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) atomic_inc(&wq->user->processes); wake_up_process(worker->task); + return true; } static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) @@ -616,9 +618,6 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) { struct io_wqe_acct *acct = &wqe->acct[index]; - /* always ensure we have one bounded worker */ - if (index == IO_WQ_ACCT_BOUND && !acct->nr_workers) - return true; /* if we have available workers or no work, no need */ if (!hlist_nulls_empty(&wqe->free_list.head) || !io_wqe_run_queue(wqe)) return false; @@ -631,10 +630,19 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) static int io_wq_manager(void *data) { struct io_wq *wq = data; + int i; - while (!kthread_should_stop()) { - int i; + /* create fixed workers */ + refcount_set(&wq->refs, wq->nr_wqes); + for (i = 0; i < wq->nr_wqes; i++) { + if (create_io_worker(wq, wq->wqes[i], IO_WQ_ACCT_BOUND)) + continue; + goto err; + } + complete(&wq->done); + + while (!kthread_should_stop()) { for (i = 0; i < wq->nr_wqes; i++) { struct io_wqe *wqe = wq->wqes[i]; bool fork_worker[2] = { false, false }; @@ -655,6 +663,12 @@ static int io_wq_manager(void *data) } return 0; +err: + set_bit(IO_WQ_BIT_ERROR, &wq->state); + set_bit(IO_WQ_BIT_EXIT, &wq->state); + if (refcount_sub_and_test(wq->nr_wqes - i, &wq->refs)) + complete(&wq->done); + return 0; } static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, @@ -1006,7 +1020,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wq->creds = data->creds; i = 0; - refcount_set(&wq->refs, wq->nr_wqes); for_each_online_node(node) { struct io_wqe *wqe; @@ -1045,14 +1058,22 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); if (!IS_ERR(wq->manager)) { wake_up_process(wq->manager); + wait_for_completion(&wq->done); + if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { + ret = -ENOMEM; + goto err; + } + reinit_completion(&wq->done); return wq; } ret = PTR_ERR(wq->manager); - wq->manager = NULL; -err: complete(&wq->done); - io_wq_destroy(wq); +err: + for (i = 0; i < wq->nr_wqes; i++) + kfree(wq->wqes[i]); + kfree(wq->wqes); + kfree(wq); return ERR_PTR(ret); } @@ -1066,10 +1087,9 @@ void io_wq_destroy(struct io_wq *wq) { int i; - if (wq->manager) { - set_bit(IO_WQ_BIT_EXIT, &wq->state); + set_bit(IO_WQ_BIT_EXIT, &wq->state); + if (wq->manager) kthread_stop(wq->manager); - } rcu_read_lock(); for (i = 0; i < wq->nr_wqes; i++) {