diff options
Diffstat (limited to 'fs/io-wq.c')
| -rw-r--r-- | fs/io-wq.c | 193 |
1 files changed, 107 insertions, 86 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index 5c4f582d6549..5b93fa67d346 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -48,7 +48,8 @@ struct io_worker { struct io_wqe *wqe; struct io_wq_work *cur_work; - spinlock_t lock; + struct io_wq_work *next_work; + raw_spinlock_t lock; struct completion ref_done; @@ -75,6 +76,7 @@ struct io_wqe_acct { unsigned max_workers; int index; atomic_t nr_running; + raw_spinlock_t lock; struct io_wq_work_list work_list; unsigned long flags; }; @@ -90,7 +92,7 @@ enum { */ struct io_wqe { raw_spinlock_t lock; - struct io_wqe_acct acct[2]; + struct io_wqe_acct acct[IO_WQ_ACCT_NR]; int node; @@ -223,12 +225,12 @@ static void io_worker_exit(struct io_worker *worker) if (worker->flags & IO_WORKER_F_FREE) hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); - preempt_disable(); + raw_spin_unlock(&wqe->lock); io_wqe_dec_running(worker); worker->flags = 0; + preempt_disable(); current->flags &= ~PF_IO_WORKER; preempt_enable(); - raw_spin_unlock(&wqe->lock); kfree_rcu(worker, rcu); io_worker_ref_put(wqe->wq); @@ -237,10 +239,15 @@ static void io_worker_exit(struct io_worker *worker) static inline bool io_acct_run_queue(struct io_wqe_acct *acct) { + bool ret = false; + + raw_spin_lock(&acct->lock); if (!wq_list_empty(&acct->work_list) && !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) - return true; - return false; + ret = true; + raw_spin_unlock(&acct->lock); + + return ret; } /* @@ -384,7 +391,6 @@ fail: } static void io_wqe_dec_running(struct io_worker *worker) - __must_hold(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; @@ -392,26 +398,27 @@ static void io_wqe_dec_running(struct io_worker *worker) if (!(worker->flags & IO_WORKER_F_UP)) return; - if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { - atomic_inc(&acct->nr_running); - atomic_inc(&wqe->wq->worker_refs); - raw_spin_unlock(&wqe->lock); - io_queue_worker_create(worker, acct, create_worker_cb); - raw_spin_lock(&wqe->lock); - } + if (!atomic_dec_and_test(&acct->nr_running)) + return; + if (!io_acct_run_queue(acct)) + return; + + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + io_queue_worker_create(worker, acct, create_worker_cb); } /* * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist */ -static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, - struct io_wq_work *work) - __must_hold(wqe->lock) +static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker) { if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; + raw_spin_lock(&wqe->lock); hlist_nulls_del_init_rcu(&worker->nulls_node); + raw_spin_unlock(&wqe->lock); } } @@ -456,7 +463,7 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, struct io_worker *worker) - __must_hold(wqe->lock) + __must_hold(acct->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; @@ -498,9 +505,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, * work being added and clearing the stalled bit. */ set_bit(IO_ACCT_STALLED_BIT, &acct->flags); - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&acct->lock); unstalled = io_wait_on_hash(wqe, stall_hash); - raw_spin_lock(&wqe->lock); + raw_spin_lock(&acct->lock); if (unstalled) { clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); if (wq_has_sleeper(&wqe->wq->hash->wait)) @@ -529,15 +536,15 @@ static void io_assign_current_work(struct io_worker *worker, cond_resched(); } - spin_lock(&worker->lock); + raw_spin_lock(&worker->lock); worker->cur_work = work; - spin_unlock(&worker->lock); + worker->next_work = NULL; + raw_spin_unlock(&worker->lock); } static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); static void io_worker_handle_work(struct io_worker *worker) - __releases(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; @@ -546,7 +553,7 @@ static void io_worker_handle_work(struct io_worker *worker) do { struct io_wq_work *work; -get_next: + /* * If we got some work, mark us as busy. If we didn't, but * the list isn't empty, it means we stalled on hashed work. @@ -554,13 +561,25 @@ get_next: * can't make progress, any work completion or insertion will * clear the stalled flag. */ + raw_spin_lock(&acct->lock); work = io_get_next_work(acct, worker); - if (work) - __io_worker_busy(wqe, worker, work); - - raw_spin_unlock(&wqe->lock); - if (!work) + raw_spin_unlock(&acct->lock); + if (work) { + __io_worker_busy(wqe, worker); + + /* + * Make sure cancelation can find this, even before + * it becomes the active work. That avoids a window + * where the work has been removed from our general + * work list, but isn't yet discoverable as the + * current work item for this worker. + */ + raw_spin_lock(&worker->lock); + worker->next_work = work; + raw_spin_unlock(&worker->lock); + } else { break; + } io_assign_current_work(worker, work); __set_current_state(TASK_RUNNING); @@ -594,15 +613,8 @@ get_next: spin_unlock_irq(&wq->hash->wait.lock); if (wq_has_sleeper(&wq->hash->wait)) wake_up(&wq->hash->wait); - raw_spin_lock(&wqe->lock); - /* skip unnecessary unlock-lock wqe->lock */ - if (!work) - goto get_next; - raw_spin_unlock(&wqe->lock); } } while (work); - - raw_spin_lock(&wqe->lock); } while (1); } @@ -626,12 +638,10 @@ static int io_wqe_worker(void *data) long ret; set_current_state(TASK_INTERRUPTIBLE); -loop: - raw_spin_lock(&wqe->lock); - if (io_acct_run_queue(acct)) { + while (io_acct_run_queue(acct)) io_worker_handle_work(worker); - goto loop; - } + + raw_spin_lock(&wqe->lock); /* timed out, exit unless we're the last worker */ if (last_timeout && acct->nr_workers > 1) { acct->nr_workers--; @@ -655,10 +665,8 @@ loop: last_timeout = !ret; } - if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { - raw_spin_lock(&wqe->lock); + if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) io_worker_handle_work(worker); - } audit_free(current); io_worker_exit(worker); @@ -670,7 +678,7 @@ loop: */ void io_wq_worker_running(struct task_struct *tsk) { - struct io_worker *worker = tsk->pf_io_worker; + struct io_worker *worker = tsk->worker_private; if (!worker) return; @@ -688,7 +696,7 @@ void io_wq_worker_running(struct task_struct *tsk) */ void io_wq_worker_sleeping(struct task_struct *tsk) { - struct io_worker *worker = tsk->pf_io_worker; + struct io_worker *worker = tsk->worker_private; if (!worker) return; @@ -698,16 +706,13 @@ void io_wq_worker_sleeping(struct task_struct *tsk) return; worker->flags &= ~IO_WORKER_F_RUNNING; - - raw_spin_lock(&worker->wqe->lock); io_wqe_dec_running(worker); - raw_spin_unlock(&worker->wqe->lock); } static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, struct task_struct *tsk) { - tsk->pf_io_worker = worker; + tsk->worker_private = worker; worker->task = tsk; set_cpus_allowed_ptr(tsk, wqe->cpu_mask); tsk->flags |= PF_NO_SETAFFINITY; @@ -771,10 +776,12 @@ static void create_worker_cont(struct callback_head *cb) .cancel_all = true, }; + raw_spin_unlock(&wqe->lock); while (io_acct_cancel_pending_work(wqe, acct, &match)) - raw_spin_lock(&wqe->lock); + ; + } else { + raw_spin_unlock(&wqe->lock); } - raw_spin_unlock(&wqe->lock); io_worker_ref_put(wqe->wq); kfree(worker); return; @@ -815,7 +822,7 @@ fail: refcount_set(&worker->ref, 1); worker->wqe = wqe; - spin_lock_init(&worker->lock); + raw_spin_lock_init(&worker->lock); init_completion(&worker->ref_done); if (index == IO_WQ_ACCT_BOUND) @@ -907,6 +914,7 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data) static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); + struct io_cb_cancel_data match; unsigned work_flags = work->flags; bool do_create; @@ -920,10 +928,12 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) return; } - raw_spin_lock(&wqe->lock); + raw_spin_lock(&acct->lock); io_wqe_insert_work(wqe, work); clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); + raw_spin_unlock(&acct->lock); + raw_spin_lock(&wqe->lock); rcu_read_lock(); do_create = !io_wqe_activate_free_worker(wqe, acct); rcu_read_unlock(); @@ -939,18 +949,18 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) return; raw_spin_lock(&wqe->lock); - /* fatal condition, failed to create the first worker */ - if (!acct->nr_workers) { - struct io_cb_cancel_data match = { - .fn = io_wq_work_match_item, - .data = work, - .cancel_all = false, - }; - - if (io_acct_cancel_pending_work(wqe, acct, &match)) - raw_spin_lock(&wqe->lock); + if (acct->nr_workers) { + raw_spin_unlock(&wqe->lock); + return; } raw_spin_unlock(&wqe->lock); + + /* fatal condition, failed to create the first worker */ + match.fn = io_wq_work_match_item, + match.data = work, + match.cancel_all = false, + + io_acct_cancel_pending_work(wqe, acct, &match); } } @@ -973,6 +983,19 @@ void io_wq_hash_work(struct io_wq_work *work, void *val) work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); } +static bool __io_wq_worker_cancel(struct io_worker *worker, + struct io_cb_cancel_data *match, + struct io_wq_work *work) +{ + if (work && match->fn(work, match->data)) { + work->flags |= IO_WQ_WORK_CANCEL; + set_notify_signal(worker->task); + return true; + } + + return false; +} + static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { struct io_cb_cancel_data *match = data; @@ -981,13 +1004,11 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) * Hold the lock to avoid ->cur_work going out of scope, caller * may dereference the passed in work. */ - spin_lock(&worker->lock); - if (worker->cur_work && - match->fn(worker->cur_work, match->data)) { - set_notify_signal(worker->task); + raw_spin_lock(&worker->lock); + if (__io_wq_worker_cancel(worker, match, worker->cur_work) || + __io_wq_worker_cancel(worker, match, worker->next_work)) match->nr_running++; - } - spin_unlock(&worker->lock); + raw_spin_unlock(&worker->lock); return match->nr_running && !match->cancel_all; } @@ -1014,22 +1035,23 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, static bool io_acct_cancel_pending_work(struct io_wqe *wqe, struct io_wqe_acct *acct, struct io_cb_cancel_data *match) - __releases(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; + raw_spin_lock(&acct->lock); wq_list_for_each(node, prev, &acct->work_list) { work = container_of(node, struct io_wq_work, list); if (!match->fn(work, match->data)) continue; io_wqe_remove_pending(wqe, work, prev); - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&acct->lock); io_run_cancel(work, wqe); match->nr_pending++; /* not safe to continue after unlock */ return true; } + raw_spin_unlock(&acct->lock); return false; } @@ -1039,17 +1061,15 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe, { int i; retry: - raw_spin_lock(&wqe->lock); for (i = 0; i < IO_WQ_ACCT_NR; i++) { struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); if (io_acct_cancel_pending_work(wqe, acct, match)) { if (match->cancel_all) goto retry; - return; + break; } } - raw_spin_unlock(&wqe->lock); } static void io_wqe_cancel_running_work(struct io_wqe *wqe, @@ -1074,6 +1094,14 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, * First check pending list, if we're lucky we can just remove it * from there. CANCEL_OK means that the work is returned as-new, * no completion will be posted for it. + * + * Then check if a free (going busy) or busy worker has the work + * currently running. If we find it there, we'll return CANCEL_RUNNING + * as an indication that we attempt to signal cancellation. The + * completion will run normally in this case. + * + * Do both of these while holding the wqe->lock, to ensure that + * we'll find a work item regardless of state. */ for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; @@ -1081,18 +1109,10 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, io_wqe_cancel_pending_work(wqe, &match); if (match.nr_pending && !match.cancel_all) return IO_WQ_CANCEL_OK; - } - - /* - * Now check if a free (going busy) or busy worker has the work - * currently running. If we find it there, we'll return CANCEL_RUNNING - * as an indication that we attempt to signal cancellation. The - * completion will run normally in this case. - */ - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; + raw_spin_lock(&wqe->lock); io_wqe_cancel_running_work(wqe, &match); + raw_spin_unlock(&wqe->lock); if (match.nr_running && !match.cancel_all) return IO_WQ_CANCEL_RUNNING; } @@ -1171,6 +1191,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) acct->index = i; atomic_set(&acct->nr_running, 0); INIT_WQ_LIST(&acct->work_list); + raw_spin_lock_init(&acct->lock); } wqe->wq = wq; raw_spin_lock_init(&wqe->lock); @@ -1355,7 +1376,7 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count) BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); - for (i = 0; i < 2; i++) { + for (i = 0; i < IO_WQ_ACCT_NR; i++) { if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) new_count[i] = task_rlimit(current, RLIMIT_NPROC); } |