diff options
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r-- | fs/io_uring.c | 682 |
1 files changed, 450 insertions, 232 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index 2a3af95be4ca..91e2cc8414f9 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -508,9 +508,9 @@ struct io_async_msghdr { struct io_async_rw { struct iovec fast_iov[UIO_FASTIOV]; - struct iovec *iov; - ssize_t nr_segs; - ssize_t size; + const struct iovec *free_iovec; + struct iov_iter iter; + size_t bytes_done; struct wait_page_queue wpq; }; @@ -540,7 +540,6 @@ enum { REQ_F_ISREG_BIT, REQ_F_COMP_LOCKED_BIT, REQ_F_NEED_CLEANUP_BIT, - REQ_F_OVERFLOW_BIT, REQ_F_POLLED_BIT, REQ_F_BUFFER_SELECTED_BIT, REQ_F_NO_FILE_TABLE_BIT, @@ -583,8 +582,6 @@ enum { REQ_F_COMP_LOCKED = BIT(REQ_F_COMP_LOCKED_BIT), /* needs cleanup */ REQ_F_NEED_CLEANUP = BIT(REQ_F_NEED_CLEANUP_BIT), - /* in overflow list */ - REQ_F_OVERFLOW = BIT(REQ_F_OVERFLOW_BIT), /* already went through poll handler */ REQ_F_POLLED = BIT(REQ_F_POLLED_BIT), /* buffer already selected */ @@ -898,6 +895,7 @@ static void io_put_req(struct io_kiocb *req); static void io_double_put_req(struct io_kiocb *req); static void __io_double_put_req(struct io_kiocb *req); static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req); +static void __io_queue_linked_timeout(struct io_kiocb *req); static void io_queue_linked_timeout(struct io_kiocb *req); static int __io_sqe_files_update(struct io_ring_ctx *ctx, struct io_uring_files_update *ip, @@ -914,9 +912,9 @@ static void io_file_put_work(struct work_struct *work); static ssize_t io_import_iovec(int rw, struct io_kiocb *req, struct iovec **iovec, struct iov_iter *iter, bool needs_lock); -static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size, - struct iovec *iovec, struct iovec *fast_iov, - struct iov_iter *iter); +static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec, + const struct iovec *fast_iov, + struct iov_iter *iter, bool force); static struct kmem_cache *req_cachep; @@ -945,7 +943,8 @@ static void io_get_req_task(struct io_kiocb *req) static inline void io_clean_op(struct io_kiocb *req) { - if (req->flags & (REQ_F_NEED_CLEANUP | REQ_F_BUFFER_SELECTED)) + if (req->flags & (REQ_F_NEED_CLEANUP | REQ_F_BUFFER_SELECTED | + REQ_F_INFLIGHT)) __io_clean_op(req); } @@ -1107,10 +1106,16 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx) } } -static void io_req_clean_work(struct io_kiocb *req) +/* + * Returns true if we need to defer file table putting. This can only happen + * from the error path with REQ_F_COMP_LOCKED set. + */ +static bool io_req_clean_work(struct io_kiocb *req) { if (!(req->flags & REQ_F_WORK_INITIALIZED)) - return; + return false; + + req->flags &= ~REQ_F_WORK_INITIALIZED; if (req->work.mm) { mmdrop(req->work.mm); @@ -1123,6 +1128,9 @@ static void io_req_clean_work(struct io_kiocb *req) if (req->work.fs) { struct fs_struct *fs = req->work.fs; + if (req->flags & REQ_F_COMP_LOCKED) + return true; + spin_lock(&req->work.fs->lock); if (--fs->users) fs = NULL; @@ -1131,7 +1139,8 @@ static void io_req_clean_work(struct io_kiocb *req) free_fs_struct(fs); req->work.fs = NULL; } - req->flags &= ~REQ_F_WORK_INITIALIZED; + + return false; } static void io_prep_async_work(struct io_kiocb *req) @@ -1179,7 +1188,7 @@ static void io_prep_async_link(struct io_kiocb *req) io_prep_async_work(cur); } -static void __io_queue_async_work(struct io_kiocb *req) +static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *link = io_prep_linked_timeout(req); @@ -1187,16 +1196,19 @@ static void __io_queue_async_work(struct io_kiocb *req) trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, &req->work, req->flags); io_wq_enqueue(ctx->io_wq, &req->work); - - if (link) - io_queue_linked_timeout(link); + return link; } static void io_queue_async_work(struct io_kiocb *req) { + struct io_kiocb *link; + /* init ->work of the whole link before punting */ io_prep_async_link(req); - __io_queue_async_work(req); + link = __io_queue_async_work(req); + + if (link) + io_queue_linked_timeout(link); } static void io_kill_timeout(struct io_kiocb *req) @@ -1229,12 +1241,19 @@ static void __io_queue_deferred(struct io_ring_ctx *ctx) do { struct io_defer_entry *de = list_first_entry(&ctx->defer_list, struct io_defer_entry, list); + struct io_kiocb *link; if (req_need_defer(de->req, de->seq)) break; list_del_init(&de->list); /* punt-init is done before queueing for defer */ - __io_queue_async_work(de->req); + link = __io_queue_async_work(de->req); + if (link) { + __io_queue_linked_timeout(link); + /* drop submission reference */ + link->flags |= REQ_F_COMP_LOCKED; + io_put_req(link); + } kfree(de); } while (!list_empty(&ctx->defer_list)); } @@ -1345,7 +1364,6 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb, compl.list); list_move(&req->compl.list, &list); - req->flags &= ~REQ_F_OVERFLOW; if (cqe) { WRITE_ONCE(cqe->user_data, req->user_data); WRITE_ONCE(cqe->res, req->result); @@ -1398,7 +1416,6 @@ static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags) ctx->rings->sq_flags |= IORING_SQ_CQ_OVERFLOW; } io_clean_op(req); - req->flags |= REQ_F_OVERFLOW; req->result = res; req->compl.cflags = cflags; refcount_inc(&req->refs); @@ -1533,7 +1550,7 @@ static inline void io_put_file(struct io_kiocb *req, struct file *file, fput(file); } -static void io_dismantle_req(struct io_kiocb *req) +static bool io_dismantle_req(struct io_kiocb *req) { io_clean_op(req); @@ -1541,27 +1558,15 @@ static void io_dismantle_req(struct io_kiocb *req) kfree(req->io); if (req->file) io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE)); - io_req_clean_work(req); - - if (req->flags & REQ_F_INFLIGHT) { - struct io_ring_ctx *ctx = req->ctx; - unsigned long flags; - spin_lock_irqsave(&ctx->inflight_lock, flags); - list_del(&req->inflight_entry); - if (waitqueue_active(&ctx->inflight_wait)) - wake_up(&ctx->inflight_wait); - spin_unlock_irqrestore(&ctx->inflight_lock, flags); - } + return io_req_clean_work(req); } -static void __io_free_req(struct io_kiocb *req) +static void __io_free_req_finish(struct io_kiocb *req) { - struct io_ring_ctx *ctx; + struct io_ring_ctx *ctx = req->ctx; - io_dismantle_req(req); __io_put_req_task(req); - ctx = req->ctx; if (likely(!io_is_fallback_req(req))) kmem_cache_free(req_cachep, req); else @@ -1569,6 +1574,39 @@ static void __io_free_req(struct io_kiocb *req) percpu_ref_put(&ctx->refs); } +static void io_req_task_file_table_put(struct callback_head *cb) +{ + struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct fs_struct *fs = req->work.fs; + + spin_lock(&req->work.fs->lock); + if (--fs->users) + fs = NULL; + spin_unlock(&req->work.fs->lock); + if (fs) + free_fs_struct(fs); + req->work.fs = NULL; + __io_free_req_finish(req); +} + +static void __io_free_req(struct io_kiocb *req) +{ + if (!io_dismantle_req(req)) { + __io_free_req_finish(req); + } else { + int ret; + + init_task_work(&req->task_work, io_req_task_file_table_put); + ret = task_work_add(req->task, &req->task_work, TWA_RESUME); + if (unlikely(ret)) { + struct task_struct *tsk; + + tsk = io_wq_get_task(req->ctx->io_wq); + task_work_add(tsk, &req->task_work, 0); + } + } +} + static bool io_link_cancel_timeout(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; @@ -1598,6 +1636,7 @@ static bool __io_kill_linked_timeout(struct io_kiocb *req) return false; list_del_init(&link->link_list); + link->flags |= REQ_F_COMP_LOCKED; wake_ev = io_link_cancel_timeout(link); req->flags &= ~REQ_F_LINK_TIMEOUT; return wake_ev; @@ -1656,6 +1695,7 @@ static void __io_fail_links(struct io_kiocb *req) trace_io_uring_fail_link(req, link); io_cqring_fill_event(link, -ECANCELED); + link->flags |= REQ_F_COMP_LOCKED; __io_double_put_req(link); req->flags &= ~REQ_F_LINK_TIMEOUT; } @@ -1710,22 +1750,22 @@ static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb) { struct task_struct *tsk = req->task; struct io_ring_ctx *ctx = req->ctx; - int ret, notify = TWA_RESUME; + int ret, notify; /* - * SQPOLL kernel thread doesn't need notification, just a wakeup. - * If we're not using an eventfd, then TWA_RESUME is always fine, - * as we won't have dependencies between request completions for - * other kernel wait conditions. + * SQPOLL kernel thread doesn't need notification, just a wakeup. For + * all other cases, use TWA_SIGNAL unconditionally to ensure we're + * processing task_work. There's no reliable way to tell if TWA_RESUME + * will do the job. */ - if (ctx->flags & IORING_SETUP_SQPOLL) - notify = 0; - else if (ctx->cq_ev_fd) + notify = 0; + if (!(ctx->flags & IORING_SETUP_SQPOLL)) notify = TWA_SIGNAL; ret = task_work_add(tsk, cb, notify); if (!ret) wake_up_process(tsk); + return ret; } @@ -1766,8 +1806,10 @@ static void __io_req_task_submit(struct io_kiocb *req) static void io_req_task_submit(struct callback_head *cb) { struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct io_ring_ctx *ctx = req->ctx; __io_req_task_submit(req); + percpu_ref_put(&ctx->refs); } static void io_req_task_queue(struct io_kiocb *req) @@ -1775,6 +1817,7 @@ static void io_req_task_queue(struct io_kiocb *req) int ret; init_task_work(&req->task_work, io_req_task_submit); + percpu_ref_get(&req->ctx->refs); ret = io_req_task_work_add(req, &req->task_work); if (unlikely(ret)) { @@ -1855,7 +1898,7 @@ static void io_req_free_batch(struct req_batch *rb, struct io_kiocb *req) req->flags &= ~REQ_F_TASK_PINNED; } - io_dismantle_req(req); + WARN_ON_ONCE(io_dismantle_req(req)); rb->reqs[rb->to_free++] = req; if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs))) __io_req_free_batch_flush(req->ctx, rb); @@ -2241,7 +2284,7 @@ static bool io_resubmit_prep(struct io_kiocb *req, int error) ret = io_import_iovec(rw, req, &iovec, &iter, false); if (ret < 0) goto end_req; - ret = io_setup_async_rw(req, ret, iovec, inline_vecs, &iter); + ret = io_setup_async_rw(req, iovec, inline_vecs, &iter, false); if (!ret) return true; kfree(iovec); @@ -2263,6 +2306,8 @@ static void io_rw_resubmit(struct callback_head *cb) refcount_inc(&req->refs); io_queue_async_work(req); } + + percpu_ref_put(&ctx->refs); } #endif @@ -2275,6 +2320,8 @@ static bool io_rw_reissue(struct io_kiocb *req, long res) return false; init_task_work(&req->task_work, io_rw_resubmit); + percpu_ref_get(&req->ctx->refs); + ret = io_req_task_work_add(req, &req->task_work); if (!ret) return true; @@ -2527,6 +2574,14 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret, { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); + /* add previously done IO, if any */ + if (req->io && req->io->rw.bytes_done > 0) { + if (ret < 0) + ret = req->io->rw.bytes_done; + else + ret += req->io->rw.bytes_done; + } + if (req->flags & REQ_F_CUR_POS) req->file->f_pos = kiocb->ki_pos; if (ret >= 0 && kiocb->ki_complete == io_complete_rw) @@ -2749,9 +2804,9 @@ static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov, return __io_iov_buffer_select(req, iov, needs_lock); } -static ssize_t io_import_iovec(int rw, struct io_kiocb *req, - struct iovec **iovec, struct iov_iter *iter, - bool needs_lock) +static ssize_t __io_import_iovec(int rw, struct io_kiocb *req, + struct iovec **iovec, struct iov_iter *iter, + bool needs_lock) { void __user *buf = u64_to_user_ptr(req->rw.addr); size_t sqe_len = req->rw.len; @@ -2771,10 +2826,8 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) { if (req->flags & REQ_F_BUFFER_SELECT) { buf = io_rw_buffer_select(req, &sqe_len, needs_lock); - if (IS_ERR(buf)) { - *iovec = NULL; + if (IS_ERR(buf)) return PTR_ERR(buf); - } req->rw.len = sqe_len; } @@ -2783,14 +2836,6 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, return ret < 0 ? ret : sqe_len; } - if (req->io) { - struct io_async_rw *iorw = &req->io->rw; - - iov_iter_init(iter, rw, iorw->iov, iorw->nr_segs, iorw->size); - *iovec = NULL; - return iorw->size; - } - if (req->flags & REQ_F_BUFFER_SELECT) { ret = io_iov_buffer_select(req, *iovec, needs_lock); if (!ret) { @@ -2810,6 +2855,16 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); } +static ssize_t io_import_iovec(int rw, struct io_kiocb *req, + struct iovec **iovec, struct iov_iter *iter, + bool needs_lock) +{ + if (!req->io) + return __io_import_iovec(rw, req, iovec, iter, needs_lock); + *iovec = NULL; + return iov_iter_count(&req->io->rw.iter); +} + /* * For files that don't have ->read_iter() and ->write_iter(), handle them * by looping over ->read() or ->write() manually. @@ -2868,21 +2923,30 @@ static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb, return ret; } -static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size, - struct iovec *iovec, struct iovec *fast_iov, - struct iov_iter *iter) +static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec, + const struct iovec *fast_iov, struct iov_iter *iter) { struct io_async_rw *rw = &req->io->rw; - rw->nr_segs = iter->nr_segs; - rw->size = io_size; + memcpy(&rw->iter, iter, sizeof(*iter)); + rw->free_iovec = NULL; + rw->bytes_done = 0; + /* can only be fixed buffers, no need to do anything */ + if (iter->type == ITER_BVEC) + return; if (!iovec) { - rw->iov = rw->fast_iov; - if (rw->iov != fast_iov) - memcpy(rw->iov, fast_iov, + unsigned iov_off = 0; + + rw->iter.iov = rw->fast_iov; + if (iter->iov != fast_iov) { + iov_off = iter->iov - fast_iov; + rw->iter.iov += iov_off; + } + if (rw->fast_iov != fast_iov) + memcpy(rw->fast_iov + iov_off, fast_iov + iov_off, sizeof(struct iovec) * iter->nr_segs); } else { - rw->iov = iovec; + rw->free_iovec = iovec; req->flags |= REQ_F_NEED_CLEANUP; } } @@ -2901,17 +2965,17 @@ static int io_alloc_async_ctx(struct io_kiocb *req) return __io_alloc_async_ctx(req); } -static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size, - struct iovec *iovec, struct iovec *fast_iov, - struct iov_iter *iter) +static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec, + const struct iovec *fast_iov, + struct iov_iter *iter, bool force) { - if (!io_op_defs[req->opcode].async_ctx) + if (!force && !io_op_defs[req->opcode].async_ctx) return 0; if (!req->io) { if (__io_alloc_async_ctx(req)) return -ENOMEM; - io_req_map_rw(req, io_size, iovec, fast_iov, iter); + io_req_map_rw(req, iovec, fast_iov, iter); } return 0; } @@ -2919,18 +2983,16 @@ static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size, static inline int io_rw_prep_async(struct io_kiocb *req, int rw, bool force_nonblock) { - struct io_async_ctx *io = req->io; - struct iov_iter iter; + struct io_async_rw *iorw = &req->io->rw; ssize_t ret; - io->rw.iov = io->rw.fast_iov; - req->io = NULL; - ret = io_import_iovec(rw, req, &io->rw.iov, &iter, !force_nonblock); - req->io = io; + iorw->iter.iov = iorw->fast_iov; + ret = __io_import_iovec(rw, req, (struct iovec **) &iorw->iter.iov, + &iorw->iter, !force_nonblock); if (unlikely(ret < 0)) return ret; - io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter); + io_req_map_rw(req, iorw->iter.iov, iorw->fast_iov, &iorw->iter); return 0; } @@ -2952,6 +3014,16 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, return io_rw_prep_async(req, READ, force_nonblock); } +/* + * This is our waitqueue callback handler, registered through lock_page_async() + * when we initially tried to do the IO with the iocb armed our waitqueue. + * This gets called when the page is unlocked, and we generally expect that to + * happen when the page IO is completed and the page is now uptodate. This will + * queue a task_work based retry of the operation, attempting to copy the data + * again. If the latter fails because the page was NOT uptodate, then we will + * do a thread based blocking retry of the operation. That's the unexpected + * slow path. + */ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode, int sync, void *arg) { @@ -2965,13 +3037,11 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode, if (!wake_page_match(wpq, key)) return 0; - /* Stop waking things up if the page is locked again */ - if (test_bit(key->bit_nr, &key->page->flags)) - return -1; - list_del_init(&wait->entry); init_task_work(&req->task_work, io_req_task_submit); + percpu_ref_get(&req->ctx->refs); + /* submit ref gets dropped, acquire a new one */ refcount_inc(&req->refs); ret = io_req_task_work_add(req, &req->task_work); @@ -2987,40 +3057,31 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode, return 1; } -static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb, - struct wait_page_queue *wait, - wait_queue_func_t func, - void *data) -{ - /* Can't support async wakeup with polled IO */ - if (kiocb->ki_flags & IOCB_HIPRI) - return -EINVAL; - if (kiocb->ki_filp->f_mode & FMODE_BUF_RASYNC) { - wait->wait.func = func; - wait->wait.private = data; - wait->wait.flags = 0; - INIT_LIST_HEAD(&wait->wait.entry); - kiocb->ki_flags |= IOCB_WAITQ; - kiocb->ki_waitq = wait; - return 0; - } - - return -EOPNOTSUPP; -} - - +/* + * This controls whether a given IO request should be armed for async page + * based retry. If we return false here, the request is handed to the async + * worker threads for retry. If we're doing buffered reads on a regular file, + * we prepare a private wait_page_queue entry and retry the operation. This + * will either succeed because the page is now uptodate and unlocked, or it + * will register a callback when the page is unlocked at IO completion. Through + * that callback, io_uring uses task_work to setup a retry of the operation. + * That retry will attempt the buffered read again. The retry will generally + * succeed, or in rare cases where it fails, we then fall back to using the + * async worker threads for a blocking retry. + */ static bool io_rw_should_retry(struct io_kiocb *req) { + struct wait_page_queue *wait = &req->io->rw.wpq; struct kiocb *kiocb = &req->rw.kiocb; - int ret; /* never retry for NOWAIT, we just complete with -EAGAIN */ if (req->flags & REQ_F_NOWAIT) return false; - /* already tried, or we're doing O_DIRECT */ - if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ)) + /* Only for buffered IO */ + if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_HIPRI)) return false; + /* * just use poll if we can, and don't attempt if the fs doesn't * support callback based unlocks @@ -3028,28 +3089,25 @@ static bool io_rw_should_retry(struct io_kiocb *req) if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC)) return false; - /* - * If request type doesn't require req->io to defer in general, - * we need to allocate it here - */ - if (!req->io && __io_alloc_async_ctx(req)) - return false; + wait->wait.func = io_async_buf_func; + wait->wait.private = req; + wait->wait.flags = 0; + INIT_LIST_HEAD(&wait->wait.entry); + kiocb->ki_flags |= IOCB_WAITQ; + kiocb->ki_waitq = wait; - ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq, - io_async_buf_func, req); - if (!ret) { - io_get_req_task(req); - return true; - } - - return false; + io_get_req_task(req); + return true; } static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter) { if (req->file->f_op->read_iter) return call_read_iter(req->file, &req->rw.kiocb, iter); - return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter); + else if (req->file->f_op->read) + return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter); + else + return -EINVAL; } static int io_read(struct io_kiocb *req, bool force_nonblock, @@ -3057,16 +3115,19 @@ static int io_read(struct io_kiocb *req, bool force_nonblock, { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw.kiocb; - struct iov_iter iter; - size_t iov_count; + struct iov_iter __iter, *iter = &__iter; ssize_t io_size, ret, ret2; - unsigned long nr_segs; + size_t iov_count; - ret = io_import_iovec(READ, req, &iovec, &iter, !force_nonblock); + if (req->io) + iter = &req->io->rw.iter; + + ret = io_import_iovec(READ, req, &iovec, iter, !force_nonblock); if (ret < 0) return ret; io_size = ret; req->result = io_size; + ret = 0; /* Ensure we clear previously set non-block flag */ if (!force_nonblock) @@ -3076,41 +3137,72 @@ static int io_read(struct io_kiocb *req, bool force_nonblock, if (force_nonblock && !io_file_supports_async(req->file, READ)) goto copy_iov; - iov_count = iov_iter_count(&iter); - nr_segs = iter.nr_segs; + iov_count = iov_iter_count(iter); ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count); if (unlikely(ret)) goto out_free; - ret2 = io_iter_do_read(req, &iter); + ret = io_iter_do_read(req, iter); - /* Catch -EAGAIN return for forced non-blocking submission */ - if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) { - kiocb_done(kiocb, ret2, cs); - } else { - iter.count = iov_count; - iter.nr_segs = nr_segs; -copy_iov: - ret = io_setup_async_rw(req, io_size, iovec, inline_vecs, - &iter); + if (!ret) { + goto done; + } else if (ret == -EIOCBQUEUED) { + ret = 0; + goto out_free; + } else if (ret == -EAGAIN) { + if (!force_nonblock) + goto done; + ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false); if (ret) goto out_free; - /* it's copied and will be cleaned with ->io */ - iovec = NULL; - /* if we can retry, do so with the callbacks armed */ - if (io_rw_should_retry(req)) { - ret2 = io_iter_do_read(req, &iter); - if (ret2 == -EIOCBQUEUED) { - goto out_free; - } else if (ret2 != -EAGAIN) { - kiocb_done(kiocb, ret2, cs); - goto out_free; - } - } + return -EAGAIN; + } else if (ret < 0) { + goto out_free; + } + + /* read it all, or we did blocking attempt. no retry. */ + if (!iov_iter_count(iter) || !force_nonblock || + (req->file->f_flags & O_NONBLOCK)) + goto done; + + io_size -= ret; +copy_iov: + ret2 = io_setup_async_rw(req, iovec, inline_vecs, iter, true); + if (ret2) { + ret = ret2; + goto out_free; + } + /* it's copied and will be cleaned with ->io */ + iovec = NULL; + /* now use our persistent iterator, if we aren't already */ + iter = &req->io->rw.iter; +retry: + req->io->rw.bytes_done += ret; + /* if we can retry, do so with the callbacks armed */ + if (!io_rw_should_retry(req)) { kiocb->ki_flags &= ~IOCB_WAITQ; return -EAGAIN; } + + /* + * Now retry read with the IOCB_WAITQ parts set in the iocb. If we + * get -EIOCBQUEUED, then we'll get a notification when the desired + * page gets unlocked. We can also get a partial read here, and if we + * do, then just retry at the new offset. + */ + ret = io_iter_do_read(req, iter); + if (ret == -EIOCBQUEUED) { + ret = 0; + goto out_free; + } else if (ret > 0 && ret < io_size) { + /* we got some bytes, but not all. retry. */ + goto retry; + } +done: + kiocb_done(kiocb, ret, cs); + ret = 0; out_free: + /* it's reportedly faster than delegating the null check to kfree() */ if (iovec) kfree(iovec); return ret; @@ -3139,12 +3231,14 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, { struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct kiocb *kiocb = &req->rw.kiocb; - struct iov_iter iter; + struct iov_iter __iter, *iter = &__iter; size_t iov_count; ssize_t ret, ret2, io_size; - unsigned long nr_segs; - ret = io_import_iovec(WRITE, req, &iovec, &iter, !force_nonblock); + if (req->io) + iter = &req->io->rw.iter; + + ret = io_import_iovec(WRITE, req, &iovec, iter, !force_nonblock); if (ret < 0) return ret; io_size = ret; @@ -3163,8 +3257,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, (req->flags & REQ_F_ISREG)) goto copy_iov; - iov_count = iov_iter_count(&iter); - nr_segs = iter.nr_segs; + iov_count = iov_iter_count(iter); ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count); if (unlikely(ret)) goto out_free; @@ -3185,9 +3278,11 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, kiocb->ki_flags |= IOCB_WRITE; if (req->file->f_op->write_iter) - ret2 = call_write_iter(req->file, kiocb, &iter); + ret2 = call_write_iter(req->file, kiocb, iter); + else if (req->file->f_op->write) + ret2 = loop_rw_iter(WRITE, req->file, kiocb, iter); else - ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter); + ret2 = -EINVAL; /* * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just @@ -3198,18 +3293,13 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, if (!force_nonblock || ret2 != -EAGAIN) { kiocb_done(kiocb, ret2, cs); } else { - iter.count = iov_count; - iter.nr_segs = nr_segs; copy_iov: - ret = io_setup_async_rw(req, io_size, iovec, inline_vecs, - &iter); - if (ret) - goto out_free; - /* it's copied and will be cleaned with ->io */ - iovec = NULL; - return -EAGAIN; + ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false); + if (!ret) + return -EAGAIN; } out_free: + /* it's reportedly faster than delegating the null check to kfree() */ if (iovec) kfree(iovec); return ret; @@ -4488,6 +4578,8 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll, req->result = mask; init_task_work(&req->task_work, func); + percpu_ref_get(&req->ctx->refs); + /* * If this fails, then the task is exiting. When a task exits, the * work gets canceled, so just cancel this request as well instead @@ -4526,9 +4618,24 @@ static bool io_poll_rewait(struct io_kiocb *req, struct io_poll_iocb *poll) return false; } -static void io_poll_remove_double(struct io_kiocb *req, void *data) +static struct io_poll_iocb *io_poll_get_double(struct io_kiocb *req) { - struct io_poll_iocb *poll = data; + /* pure poll stashes this in ->io, poll driven retry elsewhere */ + if (req->opcode == IORING_OP_POLL_ADD) + return (struct io_poll_iocb *) req->io; + return req->apoll->double_poll; +} + +static struct io_poll_iocb *io_poll_get_single(struct io_kiocb *req) +{ + if (req->opcode == IORING_OP_POLL_ADD) + return &req->poll; + return &req->apoll->poll; +} + +static void io_poll_remove_double(struct io_kiocb *req) +{ + struct io_poll_iocb *poll = io_poll_get_double(req); lockdep_assert_held(&req->ctx->completion_lock); @@ -4548,7 +4655,7 @@ static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error) { struct io_ring_ctx *ctx = req->ctx; - io_poll_remove_double(req, req->io); + io_poll_remove_double(req); req->poll.done = true; io_cqring_fill_event(req, error ? error : mangle_poll(mask)); io_commit_cqring(ctx); @@ -4575,18 +4682,20 @@ static void io_poll_task_handler(struct io_kiocb *req, struct io_kiocb **nxt) static void io_poll_task_func(struct callback_head *cb) { struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *nxt = NULL; io_poll_task_handler(req, &nxt); if (nxt) __io_req_task_submit(nxt); + percpu_ref_put(&ctx->refs); } static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { struct io_kiocb *req = wait->private; - struct io_poll_iocb *poll = req->apoll->double_poll; + struct io_poll_iocb *poll = io_poll_get_single(req); __poll_t mask = key_to_poll(key); /* for instances that support it check for an event match first: */ @@ -4600,6 +4709,8 @@ static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode, done = list_empty(&poll->wait.entry); if (!done) list_del_init(&poll->wait.entry); + /* make sure double remove sees this as being gone */ + wait->private = NULL; spin_unlock(&poll->head->lock); if (!done) __io_async_wake(req, poll, mask, io_poll_task_func); @@ -4675,6 +4786,7 @@ static void io_async_task_func(struct callback_head *cb) if (io_poll_rewait(req, &apoll->poll)) { spin_unlock_irq(&ctx->completion_lock); + percpu_ref_put(&ctx->refs); return; } @@ -4682,7 +4794,7 @@ static void io_async_task_func(struct callback_head *cb) if (hash_hashed(&req->hash_node)) hash_del(&req->hash_node); - io_poll_remove_double(req, apoll->double_poll); + io_poll_remove_double(req); spin_unlock_irq(&ctx->completion_lock); if (!READ_ONCE(apoll->poll.canceled)) @@ -4690,6 +4802,7 @@ static void io_async_task_func(struct callback_head *cb) else __io_req_task_cancel(req, -ECANCELED); + percpu_ref_put(&ctx->refs); kfree(apoll->double_poll); kfree(apoll); } @@ -4791,8 +4904,8 @@ static bool io_arm_poll_handler(struct io_kiocb *req) ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask, io_async_wake); - if (ret) { - io_poll_remove_double(req, apoll->double_poll); + if (ret || ipt.error) { + io_poll_remove_double(req); spin_unlock_irq(&ctx->completion_lock); kfree(apoll->double_poll); kfree(apoll); @@ -4824,14 +4937,13 @@ static bool io_poll_remove_one(struct io_kiocb *req) { bool do_complete; + io_poll_remove_double(req); + if (req->opcode == IORING_OP_POLL_ADD) { - io_poll_remove_double(req, req->io); do_complete = __io_poll_remove_one(req, &req->poll); } else { struct async_poll *apoll = req->apoll; - io_poll_remove_double(req, apoll->double_poll); - /* non-poll requests have submit ref still */ do_complete = __io_poll_remove_one(req, &apoll->poll); if (do_complete) { @@ -4845,6 +4957,7 @@ static bool io_poll_remove_one(struct io_kiocb *req) io_cqring_fill_event(req, -ECANCELED); io_commit_cqring(req->ctx); req->flags |= REQ_F_COMP_LOCKED; + req_set_fail_links(req); io_put_req(req); } @@ -5017,6 +5130,23 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) return HRTIMER_NORESTART; } +static int __io_timeout_cancel(struct io_kiocb *req) +{ + int ret; + + list_del_init(&req->timeout.list); + + ret = hrtimer_try_to_cancel(&req->io->timeout.timer); + if (ret == -1) + return -EALREADY; + + req_set_fail_links(req); + req->flags |= REQ_F_COMP_LOCKED; + io_cqring_fill_event(req, -ECANCELED); + io_put_req(req); + return 0; +} + static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) { struct io_kiocb *req; @@ -5024,7 +5154,6 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) list_for_each_entry(req, &ctx->timeout_list, timeout.list) { if (user_data == req->user_data) { - list_del_init(&req->timeout.list); ret = 0; break; } @@ -5033,14 +5162,7 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) if (ret == -ENOENT) return ret; - ret = hrtimer_try_to_cancel(&req->io->timeout.timer); - if (ret == -1) - return -EALREADY; - - req_set_fail_links(req); - io_cqring_fill_event(req, -ECANCELED); - io_put_req(req); - return 0; + return __io_timeout_cancel(req); } static int io_timeout_remove_prep(struct io_kiocb *req, @@ -5481,8 +5603,8 @@ static void __io_clean_op(struct io_kiocb *req) case IORING_OP_WRITEV: case IORING_OP_WRITE_FIXED: case IORING_OP_WRITE: - if (io->rw.iov != io->rw.fast_iov) - kfree(io->rw.iov); + if (io->rw.free_iovec) + kfree(io->rw.free_iovec); break; case IORING_OP_RECVMSG: case IORING_OP_SENDMSG: @@ -5497,6 +5619,18 @@ static void __io_clean_op(struct io_kiocb *req) } req->flags &= ~REQ_F_NEED_CLEANUP; } + + if (req->flags & REQ_F_INFLIGHT) { + struct io_ring_ctx *ctx = req->ctx; + unsigned long flags; + + spin_lock_irqsave(&ctx->inflight_lock, flags); + list_del(&req->inflight_entry); + if (waitqueue_active(&ctx->inflight_wait)) + wake_up(&ctx->inflight_wait); + spin_unlock_irqrestore(&ctx->inflight_lock, flags); + req->flags &= ~REQ_F_INFLIGHT; + } } static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, @@ -5917,15 +6051,12 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer) return HRTIMER_NORESTART; } -static void io_queue_linked_timeout(struct io_kiocb *req) +static void __io_queue_linked_timeout(struct io_kiocb *req) { - struct io_ring_ctx *ctx = req->ctx; - /* * If the list is now empty, then our linked request finished before * we got a chance to setup the timer */ - spin_lock_irq(&ctx->completion_lock); if (!list_empty(&req->link_list)) { struct io_timeout_data *data = &req->io->timeout; @@ -5933,6 +6064,14 @@ static void io_queue_linked_timeout(struct io_kiocb *req) hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode); } +} + +static void io_queue_linked_timeout(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + + spin_lock_irq(&ctx->completion_lock); + __io_queue_linked_timeout(req); spin_unlock_irq(&ctx->completion_lock); /* drop submission reference */ @@ -7818,7 +7957,13 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) ACCT_LOCKED); INIT_WORK(&ctx->exit_work, io_ring_exit_work); - queue_work(system_wq, &ctx->exit_work); + /* + * Use system_unbound_wq to avoid spawning tons of event kworkers + * if we're exiting a ton of rings at the same time. It just adds + * noise and overhead, there's no discernable change in runtime + * over using system_wq. + */ + queue_work(system_unbound_wq, &ctx->exit_work); } static int io_uring_release(struct inode *inode, struct file *file) @@ -7837,6 +7982,98 @@ static bool io_wq_files_match(struct io_wq_work *work, void *data) return work->files == files; } +/* + * Returns true if 'preq' is the link parent of 'req' + */ +static bool io_match_link(struct io_kiocb *preq, struct io_kiocb *req) +{ + struct io_kiocb *link; + + if (!(preq->flags & REQ_F_LINK_HEAD)) + return false; + + list_for_each_entry(link, &preq->link_list, link_list) { + if (link == req) + return true; + } + + return false; +} + +/* + * We're looking to cancel 'req' because it's holding on to our files, but + * 'req' could be a link to another request. See if it is, and cancel that + * parent request if so. + */ +static bool io_poll_remove_link(struct io_ring_ctx *ctx, struct io_kiocb *req) +{ + struct hlist_node *tmp; + struct io_kiocb *preq; + bool found = false; + int i; + + spin_lock_irq(&ctx->completion_lock); + for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) { + struct hlist_head *list; + + list = &ctx->cancel_hash[i]; + hlist_for_each_entry_safe(preq, tmp, list, hash_node) { + found = io_match_link(preq, req); + if (found) { + io_poll_remove_one(preq); + break; + } + } + } + spin_unlock_irq(&ctx->completion_lock); + return found; +} + +static bool io_timeout_remove_link(struct io_ring_ctx *ctx, + struct io_kiocb *req) +{ + struct io_kiocb *preq; + bool found = false; + + spin_lock_irq(&ctx->completion_lock); + list_for_each_entry(preq, &ctx->timeout_list, timeout.list) { + found = io_match_link(preq, req); + if (found) { + __io_timeout_cancel(preq); + break; + } + } + spin_unlock_irq(&ctx->completion_lock); + return found; +} + +static bool io_cancel_link_cb(struct io_wq_work *work, void *data) +{ + return io_match_link(container_of(work, struct io_kiocb, work), data); +} + +static void io_attempt_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req) +{ + enum io_wq_cancel cret; + + /* cancel this particular work, if it's running */ + cret = io_wq_cancel_work(ctx->io_wq, &req->work); + if (cret != IO_WQ_CANCEL_NOTFOUND) + return; + + /* find links that hold this pending, cancel those */ + cret = io_wq_cancel_cb(ctx->io_wq, io_cancel_link_cb, req, true); + if (cret != IO_WQ_CANCEL_NOTFOUND) + return; + + /* if we have a poll link holding this pending, cancel that */ + if (io_poll_remove_link(ctx, req)) + return; + + /* final option, timeout link is holding this req pending */ + io_timeout_remove_link(ctx, req); +} + static void io_uring_cancel_files(struct io_ring_ctx *ctx, struct files_struct *files) { @@ -7868,32 +8105,9 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx, /* We need to keep going until we don't find a matching req */ if (!cancel_req) break; - - if (cancel_req->flags & REQ_F_OVERFLOW) { - spin_lock_irq(&ctx->completion_lock); - list_del(&cancel_req->compl.list); - cancel_req->flags &= ~REQ_F_OVERFLOW; - - io_cqring_mark_overflow(ctx); - WRITE_ONCE(ctx->rings->cq_overflow, - atomic_inc_return(&ctx->cached_cq_overflow)); - io_commit_cqring(ctx); - spin_unlock_irq(&ctx->completion_lock); - - /* - * Put inflight ref and overflow ref. If that's - * all we had, then we're done with this request. - */ - if (refcount_sub_and_test(2, &cancel_req->refs)) { - io_free_req(cancel_req); - finish_wait(&ctx->inflight_wait, &wait); - continue; - } - } else { - io_wq_cancel_work(ctx->io_wq, &cancel_req->work); - io_put_req(cancel_req); - } - + /* cancel this request, or head link requests */ + io_attempt_cancel(ctx, cancel_req); + io_put_req(cancel_req); schedule(); finish_wait(&ctx->inflight_wait, &wait); } @@ -8171,6 +8385,10 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, struct io_rings *rings; size_t size, sq_array_offset; + /* make sure these are sane, as we already accounted them */ + ctx->sq_entries = p->sq_entries; + ctx->cq_entries = p->cq_entries; + size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset); if (size == SIZE_MAX) return -EOVERFLOW; @@ -8187,8 +8405,6 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, rings->cq_ring_entries = p->cq_entries; ctx->sq_mask = rings->sq_ring_mask; ctx->cq_mask = rings->cq_ring_mask; - ctx->sq_entries = rings->sq_ring_entries; - ctx->cq_entries = rings->cq_ring_entries; size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); if (size == SIZE_MAX) { @@ -8317,6 +8533,16 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, ctx->user = user; ctx->creds = get_current_cred(); + /* + * Account memory _before_ installing the file descriptor. Once + * the descriptor is installed, it can get closed at any time. Also + * do this before hitting the general error path, as ring freeing + * will un-account as well. + */ + io_account_mem(ctx, ring_pages(p->sq_entries, p->cq_entries), + ACCT_LOCKED); + ctx->limit_mem = limit_mem; + ret = io_allocate_scq_urings(ctx, p); if (ret) goto err; @@ -8354,14 +8580,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, } /* - * Account memory _before_ installing the file descriptor. Once - * the descriptor is installed, it can get closed at any time. - */ - io_account_mem(ctx, ring_pages(p->sq_entries, p->cq_entries), - ACCT_LOCKED); - ctx->limit_mem = limit_mem; - - /* * Install ring fd as the very last thing, so we don't risk someone * having closed it before we finish setup */ |