aboutsummaryrefslogtreecommitdiff
path: root/fs/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io_uring.c')
-rw-r--r--fs/io_uring.c682
1 files changed, 450 insertions, 232 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 2a3af95be4ca..91e2cc8414f9 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -508,9 +508,9 @@ struct io_async_msghdr {
struct io_async_rw {
struct iovec fast_iov[UIO_FASTIOV];
- struct iovec *iov;
- ssize_t nr_segs;
- ssize_t size;
+ const struct iovec *free_iovec;
+ struct iov_iter iter;
+ size_t bytes_done;
struct wait_page_queue wpq;
};
@@ -540,7 +540,6 @@ enum {
REQ_F_ISREG_BIT,
REQ_F_COMP_LOCKED_BIT,
REQ_F_NEED_CLEANUP_BIT,
- REQ_F_OVERFLOW_BIT,
REQ_F_POLLED_BIT,
REQ_F_BUFFER_SELECTED_BIT,
REQ_F_NO_FILE_TABLE_BIT,
@@ -583,8 +582,6 @@ enum {
REQ_F_COMP_LOCKED = BIT(REQ_F_COMP_LOCKED_BIT),
/* needs cleanup */
REQ_F_NEED_CLEANUP = BIT(REQ_F_NEED_CLEANUP_BIT),
- /* in overflow list */
- REQ_F_OVERFLOW = BIT(REQ_F_OVERFLOW_BIT),
/* already went through poll handler */
REQ_F_POLLED = BIT(REQ_F_POLLED_BIT),
/* buffer already selected */
@@ -898,6 +895,7 @@ static void io_put_req(struct io_kiocb *req);
static void io_double_put_req(struct io_kiocb *req);
static void __io_double_put_req(struct io_kiocb *req);
static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
+static void __io_queue_linked_timeout(struct io_kiocb *req);
static void io_queue_linked_timeout(struct io_kiocb *req);
static int __io_sqe_files_update(struct io_ring_ctx *ctx,
struct io_uring_files_update *ip,
@@ -914,9 +912,9 @@ static void io_file_put_work(struct work_struct *work);
static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
struct iovec **iovec, struct iov_iter *iter,
bool needs_lock);
-static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
- struct iovec *iovec, struct iovec *fast_iov,
- struct iov_iter *iter);
+static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
+ const struct iovec *fast_iov,
+ struct iov_iter *iter, bool force);
static struct kmem_cache *req_cachep;
@@ -945,7 +943,8 @@ static void io_get_req_task(struct io_kiocb *req)
static inline void io_clean_op(struct io_kiocb *req)
{
- if (req->flags & (REQ_F_NEED_CLEANUP | REQ_F_BUFFER_SELECTED))
+ if (req->flags & (REQ_F_NEED_CLEANUP | REQ_F_BUFFER_SELECTED |
+ REQ_F_INFLIGHT))
__io_clean_op(req);
}
@@ -1107,10 +1106,16 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)
}
}
-static void io_req_clean_work(struct io_kiocb *req)
+/*
+ * Returns true if we need to defer file table putting. This can only happen
+ * from the error path with REQ_F_COMP_LOCKED set.
+ */
+static bool io_req_clean_work(struct io_kiocb *req)
{
if (!(req->flags & REQ_F_WORK_INITIALIZED))
- return;
+ return false;
+
+ req->flags &= ~REQ_F_WORK_INITIALIZED;
if (req->work.mm) {
mmdrop(req->work.mm);
@@ -1123,6 +1128,9 @@ static void io_req_clean_work(struct io_kiocb *req)
if (req->work.fs) {
struct fs_struct *fs = req->work.fs;
+ if (req->flags & REQ_F_COMP_LOCKED)
+ return true;
+
spin_lock(&req->work.fs->lock);
if (--fs->users)
fs = NULL;
@@ -1131,7 +1139,8 @@ static void io_req_clean_work(struct io_kiocb *req)
free_fs_struct(fs);
req->work.fs = NULL;
}
- req->flags &= ~REQ_F_WORK_INITIALIZED;
+
+ return false;
}
static void io_prep_async_work(struct io_kiocb *req)
@@ -1179,7 +1188,7 @@ static void io_prep_async_link(struct io_kiocb *req)
io_prep_async_work(cur);
}
-static void __io_queue_async_work(struct io_kiocb *req)
+static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *link = io_prep_linked_timeout(req);
@@ -1187,16 +1196,19 @@ static void __io_queue_async_work(struct io_kiocb *req)
trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req,
&req->work, req->flags);
io_wq_enqueue(ctx->io_wq, &req->work);
-
- if (link)
- io_queue_linked_timeout(link);
+ return link;
}
static void io_queue_async_work(struct io_kiocb *req)
{
+ struct io_kiocb *link;
+
/* init ->work of the whole link before punting */
io_prep_async_link(req);
- __io_queue_async_work(req);
+ link = __io_queue_async_work(req);
+
+ if (link)
+ io_queue_linked_timeout(link);
}
static void io_kill_timeout(struct io_kiocb *req)
@@ -1229,12 +1241,19 @@ static void __io_queue_deferred(struct io_ring_ctx *ctx)
do {
struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
struct io_defer_entry, list);
+ struct io_kiocb *link;
if (req_need_defer(de->req, de->seq))
break;
list_del_init(&de->list);
/* punt-init is done before queueing for defer */
- __io_queue_async_work(de->req);
+ link = __io_queue_async_work(de->req);
+ if (link) {
+ __io_queue_linked_timeout(link);
+ /* drop submission reference */
+ link->flags |= REQ_F_COMP_LOCKED;
+ io_put_req(link);
+ }
kfree(de);
} while (!list_empty(&ctx->defer_list));
}
@@ -1345,7 +1364,6 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
compl.list);
list_move(&req->compl.list, &list);
- req->flags &= ~REQ_F_OVERFLOW;
if (cqe) {
WRITE_ONCE(cqe->user_data, req->user_data);
WRITE_ONCE(cqe->res, req->result);
@@ -1398,7 +1416,6 @@ static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags)
ctx->rings->sq_flags |= IORING_SQ_CQ_OVERFLOW;
}
io_clean_op(req);
- req->flags |= REQ_F_OVERFLOW;
req->result = res;
req->compl.cflags = cflags;
refcount_inc(&req->refs);
@@ -1533,7 +1550,7 @@ static inline void io_put_file(struct io_kiocb *req, struct file *file,
fput(file);
}
-static void io_dismantle_req(struct io_kiocb *req)
+static bool io_dismantle_req(struct io_kiocb *req)
{
io_clean_op(req);
@@ -1541,27 +1558,15 @@ static void io_dismantle_req(struct io_kiocb *req)
kfree(req->io);
if (req->file)
io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE));
- io_req_clean_work(req);
-
- if (req->flags & REQ_F_INFLIGHT) {
- struct io_ring_ctx *ctx = req->ctx;
- unsigned long flags;
- spin_lock_irqsave(&ctx->inflight_lock, flags);
- list_del(&req->inflight_entry);
- if (waitqueue_active(&ctx->inflight_wait))
- wake_up(&ctx->inflight_wait);
- spin_unlock_irqrestore(&ctx->inflight_lock, flags);
- }
+ return io_req_clean_work(req);
}
-static void __io_free_req(struct io_kiocb *req)
+static void __io_free_req_finish(struct io_kiocb *req)
{
- struct io_ring_ctx *ctx;
+ struct io_ring_ctx *ctx = req->ctx;
- io_dismantle_req(req);
__io_put_req_task(req);
- ctx = req->ctx;
if (likely(!io_is_fallback_req(req)))
kmem_cache_free(req_cachep, req);
else
@@ -1569,6 +1574,39 @@ static void __io_free_req(struct io_kiocb *req)
percpu_ref_put(&ctx->refs);
}
+static void io_req_task_file_table_put(struct callback_head *cb)
+{
+ struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
+ struct fs_struct *fs = req->work.fs;
+
+ spin_lock(&req->work.fs->lock);
+ if (--fs->users)
+ fs = NULL;
+ spin_unlock(&req->work.fs->lock);
+ if (fs)
+ free_fs_struct(fs);
+ req->work.fs = NULL;
+ __io_free_req_finish(req);
+}
+
+static void __io_free_req(struct io_kiocb *req)
+{
+ if (!io_dismantle_req(req)) {
+ __io_free_req_finish(req);
+ } else {
+ int ret;
+
+ init_task_work(&req->task_work, io_req_task_file_table_put);
+ ret = task_work_add(req->task, &req->task_work, TWA_RESUME);
+ if (unlikely(ret)) {
+ struct task_struct *tsk;
+
+ tsk = io_wq_get_task(req->ctx->io_wq);
+ task_work_add(tsk, &req->task_work, 0);
+ }
+ }
+}
+
static bool io_link_cancel_timeout(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
@@ -1598,6 +1636,7 @@ static bool __io_kill_linked_timeout(struct io_kiocb *req)
return false;
list_del_init(&link->link_list);
+ link->flags |= REQ_F_COMP_LOCKED;
wake_ev = io_link_cancel_timeout(link);
req->flags &= ~REQ_F_LINK_TIMEOUT;
return wake_ev;
@@ -1656,6 +1695,7 @@ static void __io_fail_links(struct io_kiocb *req)
trace_io_uring_fail_link(req, link);
io_cqring_fill_event(link, -ECANCELED);
+ link->flags |= REQ_F_COMP_LOCKED;
__io_double_put_req(link);
req->flags &= ~REQ_F_LINK_TIMEOUT;
}
@@ -1710,22 +1750,22 @@ static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb)
{
struct task_struct *tsk = req->task;
struct io_ring_ctx *ctx = req->ctx;
- int ret, notify = TWA_RESUME;
+ int ret, notify;
/*
- * SQPOLL kernel thread doesn't need notification, just a wakeup.
- * If we're not using an eventfd, then TWA_RESUME is always fine,
- * as we won't have dependencies between request completions for
- * other kernel wait conditions.
+ * SQPOLL kernel thread doesn't need notification, just a wakeup. For
+ * all other cases, use TWA_SIGNAL unconditionally to ensure we're
+ * processing task_work. There's no reliable way to tell if TWA_RESUME
+ * will do the job.
*/
- if (ctx->flags & IORING_SETUP_SQPOLL)
- notify = 0;
- else if (ctx->cq_ev_fd)
+ notify = 0;
+ if (!(ctx->flags & IORING_SETUP_SQPOLL))
notify = TWA_SIGNAL;
ret = task_work_add(tsk, cb, notify);
if (!ret)
wake_up_process(tsk);
+
return ret;
}
@@ -1766,8 +1806,10 @@ static void __io_req_task_submit(struct io_kiocb *req)
static void io_req_task_submit(struct callback_head *cb)
{
struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
+ struct io_ring_ctx *ctx = req->ctx;
__io_req_task_submit(req);
+ percpu_ref_put(&ctx->refs);
}
static void io_req_task_queue(struct io_kiocb *req)
@@ -1775,6 +1817,7 @@ static void io_req_task_queue(struct io_kiocb *req)
int ret;
init_task_work(&req->task_work, io_req_task_submit);
+ percpu_ref_get(&req->ctx->refs);
ret = io_req_task_work_add(req, &req->task_work);
if (unlikely(ret)) {
@@ -1855,7 +1898,7 @@ static void io_req_free_batch(struct req_batch *rb, struct io_kiocb *req)
req->flags &= ~REQ_F_TASK_PINNED;
}
- io_dismantle_req(req);
+ WARN_ON_ONCE(io_dismantle_req(req));
rb->reqs[rb->to_free++] = req;
if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs)))
__io_req_free_batch_flush(req->ctx, rb);
@@ -2241,7 +2284,7 @@ static bool io_resubmit_prep(struct io_kiocb *req, int error)
ret = io_import_iovec(rw, req, &iovec, &iter, false);
if (ret < 0)
goto end_req;
- ret = io_setup_async_rw(req, ret, iovec, inline_vecs, &iter);
+ ret = io_setup_async_rw(req, iovec, inline_vecs, &iter, false);
if (!ret)
return true;
kfree(iovec);
@@ -2263,6 +2306,8 @@ static void io_rw_resubmit(struct callback_head *cb)
refcount_inc(&req->refs);
io_queue_async_work(req);
}
+
+ percpu_ref_put(&ctx->refs);
}
#endif
@@ -2275,6 +2320,8 @@ static bool io_rw_reissue(struct io_kiocb *req, long res)
return false;
init_task_work(&req->task_work, io_rw_resubmit);
+ percpu_ref_get(&req->ctx->refs);
+
ret = io_req_task_work_add(req, &req->task_work);
if (!ret)
return true;
@@ -2527,6 +2574,14 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret,
{
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
+ /* add previously done IO, if any */
+ if (req->io && req->io->rw.bytes_done > 0) {
+ if (ret < 0)
+ ret = req->io->rw.bytes_done;
+ else
+ ret += req->io->rw.bytes_done;
+ }
+
if (req->flags & REQ_F_CUR_POS)
req->file->f_pos = kiocb->ki_pos;
if (ret >= 0 && kiocb->ki_complete == io_complete_rw)
@@ -2749,9 +2804,9 @@ static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
return __io_iov_buffer_select(req, iov, needs_lock);
}
-static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
- struct iovec **iovec, struct iov_iter *iter,
- bool needs_lock)
+static ssize_t __io_import_iovec(int rw, struct io_kiocb *req,
+ struct iovec **iovec, struct iov_iter *iter,
+ bool needs_lock)
{
void __user *buf = u64_to_user_ptr(req->rw.addr);
size_t sqe_len = req->rw.len;
@@ -2771,10 +2826,8 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
if (req->flags & REQ_F_BUFFER_SELECT) {
buf = io_rw_buffer_select(req, &sqe_len, needs_lock);
- if (IS_ERR(buf)) {
- *iovec = NULL;
+ if (IS_ERR(buf))
return PTR_ERR(buf);
- }
req->rw.len = sqe_len;
}
@@ -2783,14 +2836,6 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
return ret < 0 ? ret : sqe_len;
}
- if (req->io) {
- struct io_async_rw *iorw = &req->io->rw;
-
- iov_iter_init(iter, rw, iorw->iov, iorw->nr_segs, iorw->size);
- *iovec = NULL;
- return iorw->size;
- }
-
if (req->flags & REQ_F_BUFFER_SELECT) {
ret = io_iov_buffer_select(req, *iovec, needs_lock);
if (!ret) {
@@ -2810,6 +2855,16 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
}
+static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
+ struct iovec **iovec, struct iov_iter *iter,
+ bool needs_lock)
+{
+ if (!req->io)
+ return __io_import_iovec(rw, req, iovec, iter, needs_lock);
+ *iovec = NULL;
+ return iov_iter_count(&req->io->rw.iter);
+}
+
/*
* For files that don't have ->read_iter() and ->write_iter(), handle them
* by looping over ->read() or ->write() manually.
@@ -2868,21 +2923,30 @@ static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
return ret;
}
-static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
- struct iovec *iovec, struct iovec *fast_iov,
- struct iov_iter *iter)
+static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
+ const struct iovec *fast_iov, struct iov_iter *iter)
{
struct io_async_rw *rw = &req->io->rw;
- rw->nr_segs = iter->nr_segs;
- rw->size = io_size;
+ memcpy(&rw->iter, iter, sizeof(*iter));
+ rw->free_iovec = NULL;
+ rw->bytes_done = 0;
+ /* can only be fixed buffers, no need to do anything */
+ if (iter->type == ITER_BVEC)
+ return;
if (!iovec) {
- rw->iov = rw->fast_iov;
- if (rw->iov != fast_iov)
- memcpy(rw->iov, fast_iov,
+ unsigned iov_off = 0;
+
+ rw->iter.iov = rw->fast_iov;
+ if (iter->iov != fast_iov) {
+ iov_off = iter->iov - fast_iov;
+ rw->iter.iov += iov_off;
+ }
+ if (rw->fast_iov != fast_iov)
+ memcpy(rw->fast_iov + iov_off, fast_iov + iov_off,
sizeof(struct iovec) * iter->nr_segs);
} else {
- rw->iov = iovec;
+ rw->free_iovec = iovec;
req->flags |= REQ_F_NEED_CLEANUP;
}
}
@@ -2901,17 +2965,17 @@ static int io_alloc_async_ctx(struct io_kiocb *req)
return __io_alloc_async_ctx(req);
}
-static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
- struct iovec *iovec, struct iovec *fast_iov,
- struct iov_iter *iter)
+static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
+ const struct iovec *fast_iov,
+ struct iov_iter *iter, bool force)
{
- if (!io_op_defs[req->opcode].async_ctx)
+ if (!force && !io_op_defs[req->opcode].async_ctx)
return 0;
if (!req->io) {
if (__io_alloc_async_ctx(req))
return -ENOMEM;
- io_req_map_rw(req, io_size, iovec, fast_iov, iter);
+ io_req_map_rw(req, iovec, fast_iov, iter);
}
return 0;
}
@@ -2919,18 +2983,16 @@ static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
static inline int io_rw_prep_async(struct io_kiocb *req, int rw,
bool force_nonblock)
{
- struct io_async_ctx *io = req->io;
- struct iov_iter iter;
+ struct io_async_rw *iorw = &req->io->rw;
ssize_t ret;
- io->rw.iov = io->rw.fast_iov;
- req->io = NULL;
- ret = io_import_iovec(rw, req, &io->rw.iov, &iter, !force_nonblock);
- req->io = io;
+ iorw->iter.iov = iorw->fast_iov;
+ ret = __io_import_iovec(rw, req, (struct iovec **) &iorw->iter.iov,
+ &iorw->iter, !force_nonblock);
if (unlikely(ret < 0))
return ret;
- io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
+ io_req_map_rw(req, iorw->iter.iov, iorw->fast_iov, &iorw->iter);
return 0;
}
@@ -2952,6 +3014,16 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
return io_rw_prep_async(req, READ, force_nonblock);
}
+/*
+ * This is our waitqueue callback handler, registered through lock_page_async()
+ * when we initially tried to do the IO with the iocb armed our waitqueue.
+ * This gets called when the page is unlocked, and we generally expect that to
+ * happen when the page IO is completed and the page is now uptodate. This will
+ * queue a task_work based retry of the operation, attempting to copy the data
+ * again. If the latter fails because the page was NOT uptodate, then we will
+ * do a thread based blocking retry of the operation. That's the unexpected
+ * slow path.
+ */
static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
int sync, void *arg)
{
@@ -2965,13 +3037,11 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
if (!wake_page_match(wpq, key))
return 0;
- /* Stop waking things up if the page is locked again */
- if (test_bit(key->bit_nr, &key->page->flags))
- return -1;
-
list_del_init(&wait->entry);
init_task_work(&req->task_work, io_req_task_submit);
+ percpu_ref_get(&req->ctx->refs);
+
/* submit ref gets dropped, acquire a new one */
refcount_inc(&req->refs);
ret = io_req_task_work_add(req, &req->task_work);
@@ -2987,40 +3057,31 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
return 1;
}
-static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb,
- struct wait_page_queue *wait,
- wait_queue_func_t func,
- void *data)
-{
- /* Can't support async wakeup with polled IO */
- if (kiocb->ki_flags & IOCB_HIPRI)
- return -EINVAL;
- if (kiocb->ki_filp->f_mode & FMODE_BUF_RASYNC) {
- wait->wait.func = func;
- wait->wait.private = data;
- wait->wait.flags = 0;
- INIT_LIST_HEAD(&wait->wait.entry);
- kiocb->ki_flags |= IOCB_WAITQ;
- kiocb->ki_waitq = wait;
- return 0;
- }
-
- return -EOPNOTSUPP;
-}
-
-
+/*
+ * This controls whether a given IO request should be armed for async page
+ * based retry. If we return false here, the request is handed to the async
+ * worker threads for retry. If we're doing buffered reads on a regular file,
+ * we prepare a private wait_page_queue entry and retry the operation. This
+ * will either succeed because the page is now uptodate and unlocked, or it
+ * will register a callback when the page is unlocked at IO completion. Through
+ * that callback, io_uring uses task_work to setup a retry of the operation.
+ * That retry will attempt the buffered read again. The retry will generally
+ * succeed, or in rare cases where it fails, we then fall back to using the
+ * async worker threads for a blocking retry.
+ */
static bool io_rw_should_retry(struct io_kiocb *req)
{
+ struct wait_page_queue *wait = &req->io->rw.wpq;
struct kiocb *kiocb = &req->rw.kiocb;
- int ret;
/* never retry for NOWAIT, we just complete with -EAGAIN */
if (req->flags & REQ_F_NOWAIT)
return false;
- /* already tried, or we're doing O_DIRECT */
- if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
+ /* Only for buffered IO */
+ if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_HIPRI))
return false;
+
/*
* just use poll if we can, and don't attempt if the fs doesn't
* support callback based unlocks
@@ -3028,28 +3089,25 @@ static bool io_rw_should_retry(struct io_kiocb *req)
if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
return false;
- /*
- * If request type doesn't require req->io to defer in general,
- * we need to allocate it here
- */
- if (!req->io && __io_alloc_async_ctx(req))
- return false;
+ wait->wait.func = io_async_buf_func;
+ wait->wait.private = req;
+ wait->wait.flags = 0;
+ INIT_LIST_HEAD(&wait->wait.entry);
+ kiocb->ki_flags |= IOCB_WAITQ;
+ kiocb->ki_waitq = wait;
- ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
- io_async_buf_func, req);
- if (!ret) {
- io_get_req_task(req);
- return true;
- }
-
- return false;
+ io_get_req_task(req);
+ return true;
}
static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter)
{
if (req->file->f_op->read_iter)
return call_read_iter(req->file, &req->rw.kiocb, iter);
- return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter);
+ else if (req->file->f_op->read)
+ return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter);
+ else
+ return -EINVAL;
}
static int io_read(struct io_kiocb *req, bool force_nonblock,
@@ -3057,16 +3115,19 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw.kiocb;
- struct iov_iter iter;
- size_t iov_count;
+ struct iov_iter __iter, *iter = &__iter;
ssize_t io_size, ret, ret2;
- unsigned long nr_segs;
+ size_t iov_count;
- ret = io_import_iovec(READ, req, &iovec, &iter, !force_nonblock);
+ if (req->io)
+ iter = &req->io->rw.iter;
+
+ ret = io_import_iovec(READ, req, &iovec, iter, !force_nonblock);
if (ret < 0)
return ret;
io_size = ret;
req->result = io_size;
+ ret = 0;
/* Ensure we clear previously set non-block flag */
if (!force_nonblock)
@@ -3076,41 +3137,72 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
if (force_nonblock && !io_file_supports_async(req->file, READ))
goto copy_iov;
- iov_count = iov_iter_count(&iter);
- nr_segs = iter.nr_segs;
+ iov_count = iov_iter_count(iter);
ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
if (unlikely(ret))
goto out_free;
- ret2 = io_iter_do_read(req, &iter);
+ ret = io_iter_do_read(req, iter);
- /* Catch -EAGAIN return for forced non-blocking submission */
- if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
- kiocb_done(kiocb, ret2, cs);
- } else {
- iter.count = iov_count;
- iter.nr_segs = nr_segs;
-copy_iov:
- ret = io_setup_async_rw(req, io_size, iovec, inline_vecs,
- &iter);
+ if (!ret) {
+ goto done;
+ } else if (ret == -EIOCBQUEUED) {
+ ret = 0;
+ goto out_free;
+ } else if (ret == -EAGAIN) {
+ if (!force_nonblock)
+ goto done;
+ ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false);
if (ret)
goto out_free;
- /* it's copied and will be cleaned with ->io */
- iovec = NULL;
- /* if we can retry, do so with the callbacks armed */
- if (io_rw_should_retry(req)) {
- ret2 = io_iter_do_read(req, &iter);
- if (ret2 == -EIOCBQUEUED) {
- goto out_free;
- } else if (ret2 != -EAGAIN) {
- kiocb_done(kiocb, ret2, cs);
- goto out_free;
- }
- }
+ return -EAGAIN;
+ } else if (ret < 0) {
+ goto out_free;
+ }
+
+ /* read it all, or we did blocking attempt. no retry. */
+ if (!iov_iter_count(iter) || !force_nonblock ||
+ (req->file->f_flags & O_NONBLOCK))
+ goto done;
+
+ io_size -= ret;
+copy_iov:
+ ret2 = io_setup_async_rw(req, iovec, inline_vecs, iter, true);
+ if (ret2) {
+ ret = ret2;
+ goto out_free;
+ }
+ /* it's copied and will be cleaned with ->io */
+ iovec = NULL;
+ /* now use our persistent iterator, if we aren't already */
+ iter = &req->io->rw.iter;
+retry:
+ req->io->rw.bytes_done += ret;
+ /* if we can retry, do so with the callbacks armed */
+ if (!io_rw_should_retry(req)) {
kiocb->ki_flags &= ~IOCB_WAITQ;
return -EAGAIN;
}
+
+ /*
+ * Now retry read with the IOCB_WAITQ parts set in the iocb. If we
+ * get -EIOCBQUEUED, then we'll get a notification when the desired
+ * page gets unlocked. We can also get a partial read here, and if we
+ * do, then just retry at the new offset.
+ */
+ ret = io_iter_do_read(req, iter);
+ if (ret == -EIOCBQUEUED) {
+ ret = 0;
+ goto out_free;
+ } else if (ret > 0 && ret < io_size) {
+ /* we got some bytes, but not all. retry. */
+ goto retry;
+ }
+done:
+ kiocb_done(kiocb, ret, cs);
+ ret = 0;
out_free:
+ /* it's reportedly faster than delegating the null check to kfree() */
if (iovec)
kfree(iovec);
return ret;
@@ -3139,12 +3231,14 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw.kiocb;
- struct iov_iter iter;
+ struct iov_iter __iter, *iter = &__iter;
size_t iov_count;
ssize_t ret, ret2, io_size;
- unsigned long nr_segs;
- ret = io_import_iovec(WRITE, req, &iovec, &iter, !force_nonblock);
+ if (req->io)
+ iter = &req->io->rw.iter;
+
+ ret = io_import_iovec(WRITE, req, &iovec, iter, !force_nonblock);
if (ret < 0)
return ret;
io_size = ret;
@@ -3163,8 +3257,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
(req->flags & REQ_F_ISREG))
goto copy_iov;
- iov_count = iov_iter_count(&iter);
- nr_segs = iter.nr_segs;
+ iov_count = iov_iter_count(iter);
ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
if (unlikely(ret))
goto out_free;
@@ -3185,9 +3278,11 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
kiocb->ki_flags |= IOCB_WRITE;
if (req->file->f_op->write_iter)
- ret2 = call_write_iter(req->file, kiocb, &iter);
+ ret2 = call_write_iter(req->file, kiocb, iter);
+ else if (req->file->f_op->write)
+ ret2 = loop_rw_iter(WRITE, req->file, kiocb, iter);
else
- ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
+ ret2 = -EINVAL;
/*
* Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
@@ -3198,18 +3293,13 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
if (!force_nonblock || ret2 != -EAGAIN) {
kiocb_done(kiocb, ret2, cs);
} else {
- iter.count = iov_count;
- iter.nr_segs = nr_segs;
copy_iov:
- ret = io_setup_async_rw(req, io_size, iovec, inline_vecs,
- &iter);
- if (ret)
- goto out_free;
- /* it's copied and will be cleaned with ->io */
- iovec = NULL;
- return -EAGAIN;
+ ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false);
+ if (!ret)
+ return -EAGAIN;
}
out_free:
+ /* it's reportedly faster than delegating the null check to kfree() */
if (iovec)
kfree(iovec);
return ret;
@@ -4488,6 +4578,8 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
req->result = mask;
init_task_work(&req->task_work, func);
+ percpu_ref_get(&req->ctx->refs);
+
/*
* If this fails, then the task is exiting. When a task exits, the
* work gets canceled, so just cancel this request as well instead
@@ -4526,9 +4618,24 @@ static bool io_poll_rewait(struct io_kiocb *req, struct io_poll_iocb *poll)
return false;
}
-static void io_poll_remove_double(struct io_kiocb *req, void *data)
+static struct io_poll_iocb *io_poll_get_double(struct io_kiocb *req)
{
- struct io_poll_iocb *poll = data;
+ /* pure poll stashes this in ->io, poll driven retry elsewhere */
+ if (req->opcode == IORING_OP_POLL_ADD)
+ return (struct io_poll_iocb *) req->io;
+ return req->apoll->double_poll;
+}
+
+static struct io_poll_iocb *io_poll_get_single(struct io_kiocb *req)
+{
+ if (req->opcode == IORING_OP_POLL_ADD)
+ return &req->poll;
+ return &req->apoll->poll;
+}
+
+static void io_poll_remove_double(struct io_kiocb *req)
+{
+ struct io_poll_iocb *poll = io_poll_get_double(req);
lockdep_assert_held(&req->ctx->completion_lock);
@@ -4548,7 +4655,7 @@ static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
{
struct io_ring_ctx *ctx = req->ctx;
- io_poll_remove_double(req, req->io);
+ io_poll_remove_double(req);
req->poll.done = true;
io_cqring_fill_event(req, error ? error : mangle_poll(mask));
io_commit_cqring(ctx);
@@ -4575,18 +4682,20 @@ static void io_poll_task_handler(struct io_kiocb *req, struct io_kiocb **nxt)
static void io_poll_task_func(struct callback_head *cb)
{
struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
+ struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *nxt = NULL;
io_poll_task_handler(req, &nxt);
if (nxt)
__io_req_task_submit(nxt);
+ percpu_ref_put(&ctx->refs);
}
static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode,
int sync, void *key)
{
struct io_kiocb *req = wait->private;
- struct io_poll_iocb *poll = req->apoll->double_poll;
+ struct io_poll_iocb *poll = io_poll_get_single(req);
__poll_t mask = key_to_poll(key);
/* for instances that support it check for an event match first: */
@@ -4600,6 +4709,8 @@ static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode,
done = list_empty(&poll->wait.entry);
if (!done)
list_del_init(&poll->wait.entry);
+ /* make sure double remove sees this as being gone */
+ wait->private = NULL;
spin_unlock(&poll->head->lock);
if (!done)
__io_async_wake(req, poll, mask, io_poll_task_func);
@@ -4675,6 +4786,7 @@ static void io_async_task_func(struct callback_head *cb)
if (io_poll_rewait(req, &apoll->poll)) {
spin_unlock_irq(&ctx->completion_lock);
+ percpu_ref_put(&ctx->refs);
return;
}
@@ -4682,7 +4794,7 @@ static void io_async_task_func(struct callback_head *cb)
if (hash_hashed(&req->hash_node))
hash_del(&req->hash_node);
- io_poll_remove_double(req, apoll->double_poll);
+ io_poll_remove_double(req);
spin_unlock_irq(&ctx->completion_lock);
if (!READ_ONCE(apoll->poll.canceled))
@@ -4690,6 +4802,7 @@ static void io_async_task_func(struct callback_head *cb)
else
__io_req_task_cancel(req, -ECANCELED);
+ percpu_ref_put(&ctx->refs);
kfree(apoll->double_poll);
kfree(apoll);
}
@@ -4791,8 +4904,8 @@ static bool io_arm_poll_handler(struct io_kiocb *req)
ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask,
io_async_wake);
- if (ret) {
- io_poll_remove_double(req, apoll->double_poll);
+ if (ret || ipt.error) {
+ io_poll_remove_double(req);
spin_unlock_irq(&ctx->completion_lock);
kfree(apoll->double_poll);
kfree(apoll);
@@ -4824,14 +4937,13 @@ static bool io_poll_remove_one(struct io_kiocb *req)
{
bool do_complete;
+ io_poll_remove_double(req);
+
if (req->opcode == IORING_OP_POLL_ADD) {
- io_poll_remove_double(req, req->io);
do_complete = __io_poll_remove_one(req, &req->poll);
} else {
struct async_poll *apoll = req->apoll;
- io_poll_remove_double(req, apoll->double_poll);
-
/* non-poll requests have submit ref still */
do_complete = __io_poll_remove_one(req, &apoll->poll);
if (do_complete) {
@@ -4845,6 +4957,7 @@ static bool io_poll_remove_one(struct io_kiocb *req)
io_cqring_fill_event(req, -ECANCELED);
io_commit_cqring(req->ctx);
req->flags |= REQ_F_COMP_LOCKED;
+ req_set_fail_links(req);
io_put_req(req);
}
@@ -5017,6 +5130,23 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
return HRTIMER_NORESTART;
}
+static int __io_timeout_cancel(struct io_kiocb *req)
+{
+ int ret;
+
+ list_del_init(&req->timeout.list);
+
+ ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
+ if (ret == -1)
+ return -EALREADY;
+
+ req_set_fail_links(req);
+ req->flags |= REQ_F_COMP_LOCKED;
+ io_cqring_fill_event(req, -ECANCELED);
+ io_put_req(req);
+ return 0;
+}
+
static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
{
struct io_kiocb *req;
@@ -5024,7 +5154,6 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
list_for_each_entry(req, &ctx->timeout_list, timeout.list) {
if (user_data == req->user_data) {
- list_del_init(&req->timeout.list);
ret = 0;
break;
}
@@ -5033,14 +5162,7 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
if (ret == -ENOENT)
return ret;
- ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
- if (ret == -1)
- return -EALREADY;
-
- req_set_fail_links(req);
- io_cqring_fill_event(req, -ECANCELED);
- io_put_req(req);
- return 0;
+ return __io_timeout_cancel(req);
}
static int io_timeout_remove_prep(struct io_kiocb *req,
@@ -5481,8 +5603,8 @@ static void __io_clean_op(struct io_kiocb *req)
case IORING_OP_WRITEV:
case IORING_OP_WRITE_FIXED:
case IORING_OP_WRITE:
- if (io->rw.iov != io->rw.fast_iov)
- kfree(io->rw.iov);
+ if (io->rw.free_iovec)
+ kfree(io->rw.free_iovec);
break;
case IORING_OP_RECVMSG:
case IORING_OP_SENDMSG:
@@ -5497,6 +5619,18 @@ static void __io_clean_op(struct io_kiocb *req)
}
req->flags &= ~REQ_F_NEED_CLEANUP;
}
+
+ if (req->flags & REQ_F_INFLIGHT) {
+ struct io_ring_ctx *ctx = req->ctx;
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->inflight_lock, flags);
+ list_del(&req->inflight_entry);
+ if (waitqueue_active(&ctx->inflight_wait))
+ wake_up(&ctx->inflight_wait);
+ spin_unlock_irqrestore(&ctx->inflight_lock, flags);
+ req->flags &= ~REQ_F_INFLIGHT;
+ }
}
static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
@@ -5917,15 +6051,12 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
return HRTIMER_NORESTART;
}
-static void io_queue_linked_timeout(struct io_kiocb *req)
+static void __io_queue_linked_timeout(struct io_kiocb *req)
{
- struct io_ring_ctx *ctx = req->ctx;
-
/*
* If the list is now empty, then our linked request finished before
* we got a chance to setup the timer
*/
- spin_lock_irq(&ctx->completion_lock);
if (!list_empty(&req->link_list)) {
struct io_timeout_data *data = &req->io->timeout;
@@ -5933,6 +6064,14 @@ static void io_queue_linked_timeout(struct io_kiocb *req)
hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
data->mode);
}
+}
+
+static void io_queue_linked_timeout(struct io_kiocb *req)
+{
+ struct io_ring_ctx *ctx = req->ctx;
+
+ spin_lock_irq(&ctx->completion_lock);
+ __io_queue_linked_timeout(req);
spin_unlock_irq(&ctx->completion_lock);
/* drop submission reference */
@@ -7818,7 +7957,13 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
ACCT_LOCKED);
INIT_WORK(&ctx->exit_work, io_ring_exit_work);
- queue_work(system_wq, &ctx->exit_work);
+ /*
+ * Use system_unbound_wq to avoid spawning tons of event kworkers
+ * if we're exiting a ton of rings at the same time. It just adds
+ * noise and overhead, there's no discernable change in runtime
+ * over using system_wq.
+ */
+ queue_work(system_unbound_wq, &ctx->exit_work);
}
static int io_uring_release(struct inode *inode, struct file *file)
@@ -7837,6 +7982,98 @@ static bool io_wq_files_match(struct io_wq_work *work, void *data)
return work->files == files;
}
+/*
+ * Returns true if 'preq' is the link parent of 'req'
+ */
+static bool io_match_link(struct io_kiocb *preq, struct io_kiocb *req)
+{
+ struct io_kiocb *link;
+
+ if (!(preq->flags & REQ_F_LINK_HEAD))
+ return false;
+
+ list_for_each_entry(link, &preq->link_list, link_list) {
+ if (link == req)
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * We're looking to cancel 'req' because it's holding on to our files, but
+ * 'req' could be a link to another request. See if it is, and cancel that
+ * parent request if so.
+ */
+static bool io_poll_remove_link(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+ struct hlist_node *tmp;
+ struct io_kiocb *preq;
+ bool found = false;
+ int i;
+
+ spin_lock_irq(&ctx->completion_lock);
+ for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
+ struct hlist_head *list;
+
+ list = &ctx->cancel_hash[i];
+ hlist_for_each_entry_safe(preq, tmp, list, hash_node) {
+ found = io_match_link(preq, req);
+ if (found) {
+ io_poll_remove_one(preq);
+ break;
+ }
+ }
+ }
+ spin_unlock_irq(&ctx->completion_lock);
+ return found;
+}
+
+static bool io_timeout_remove_link(struct io_ring_ctx *ctx,
+ struct io_kiocb *req)
+{
+ struct io_kiocb *preq;
+ bool found = false;
+
+ spin_lock_irq(&ctx->completion_lock);
+ list_for_each_entry(preq, &ctx->timeout_list, timeout.list) {
+ found = io_match_link(preq, req);
+ if (found) {
+ __io_timeout_cancel(preq);
+ break;
+ }
+ }
+ spin_unlock_irq(&ctx->completion_lock);
+ return found;
+}
+
+static bool io_cancel_link_cb(struct io_wq_work *work, void *data)
+{
+ return io_match_link(container_of(work, struct io_kiocb, work), data);
+}
+
+static void io_attempt_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+ enum io_wq_cancel cret;
+
+ /* cancel this particular work, if it's running */
+ cret = io_wq_cancel_work(ctx->io_wq, &req->work);
+ if (cret != IO_WQ_CANCEL_NOTFOUND)
+ return;
+
+ /* find links that hold this pending, cancel those */
+ cret = io_wq_cancel_cb(ctx->io_wq, io_cancel_link_cb, req, true);
+ if (cret != IO_WQ_CANCEL_NOTFOUND)
+ return;
+
+ /* if we have a poll link holding this pending, cancel that */
+ if (io_poll_remove_link(ctx, req))
+ return;
+
+ /* final option, timeout link is holding this req pending */
+ io_timeout_remove_link(ctx, req);
+}
+
static void io_uring_cancel_files(struct io_ring_ctx *ctx,
struct files_struct *files)
{
@@ -7868,32 +8105,9 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
/* We need to keep going until we don't find a matching req */
if (!cancel_req)
break;
-
- if (cancel_req->flags & REQ_F_OVERFLOW) {
- spin_lock_irq(&ctx->completion_lock);
- list_del(&cancel_req->compl.list);
- cancel_req->flags &= ~REQ_F_OVERFLOW;
-
- io_cqring_mark_overflow(ctx);
- WRITE_ONCE(ctx->rings->cq_overflow,
- atomic_inc_return(&ctx->cached_cq_overflow));
- io_commit_cqring(ctx);
- spin_unlock_irq(&ctx->completion_lock);
-
- /*
- * Put inflight ref and overflow ref. If that's
- * all we had, then we're done with this request.
- */
- if (refcount_sub_and_test(2, &cancel_req->refs)) {
- io_free_req(cancel_req);
- finish_wait(&ctx->inflight_wait, &wait);
- continue;
- }
- } else {
- io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
- io_put_req(cancel_req);
- }
-
+ /* cancel this request, or head link requests */
+ io_attempt_cancel(ctx, cancel_req);
+ io_put_req(cancel_req);
schedule();
finish_wait(&ctx->inflight_wait, &wait);
}
@@ -8171,6 +8385,10 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
struct io_rings *rings;
size_t size, sq_array_offset;
+ /* make sure these are sane, as we already accounted them */
+ ctx->sq_entries = p->sq_entries;
+ ctx->cq_entries = p->cq_entries;
+
size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
if (size == SIZE_MAX)
return -EOVERFLOW;
@@ -8187,8 +8405,6 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
rings->cq_ring_entries = p->cq_entries;
ctx->sq_mask = rings->sq_ring_mask;
ctx->cq_mask = rings->cq_ring_mask;
- ctx->sq_entries = rings->sq_ring_entries;
- ctx->cq_entries = rings->cq_ring_entries;
size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
if (size == SIZE_MAX) {
@@ -8317,6 +8533,16 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
ctx->user = user;
ctx->creds = get_current_cred();
+ /*
+ * Account memory _before_ installing the file descriptor. Once
+ * the descriptor is installed, it can get closed at any time. Also
+ * do this before hitting the general error path, as ring freeing
+ * will un-account as well.
+ */
+ io_account_mem(ctx, ring_pages(p->sq_entries, p->cq_entries),
+ ACCT_LOCKED);
+ ctx->limit_mem = limit_mem;
+
ret = io_allocate_scq_urings(ctx, p);
if (ret)
goto err;
@@ -8354,14 +8580,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
}
/*
- * Account memory _before_ installing the file descriptor. Once
- * the descriptor is installed, it can get closed at any time.
- */
- io_account_mem(ctx, ring_pages(p->sq_entries, p->cq_entries),
- ACCT_LOCKED);
- ctx->limit_mem = limit_mem;
-
- /*
* Install ring fd as the very last thing, so we don't risk someone
* having closed it before we finish setup
*/