aboutsummaryrefslogtreecommitdiff
path: root/io_uring/poll.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/poll.c')
-rw-r--r--io_uring/poll.c181
1 files changed, 114 insertions, 67 deletions
diff --git a/io_uring/poll.c b/io_uring/poll.c
index 0d9f49c575e0..ee7da6150ec4 100644
--- a/io_uring/poll.c
+++ b/io_uring/poll.c
@@ -40,7 +40,14 @@ struct io_poll_table {
};
#define IO_POLL_CANCEL_FLAG BIT(31)
-#define IO_POLL_REF_MASK GENMASK(30, 0)
+#define IO_POLL_RETRY_FLAG BIT(30)
+#define IO_POLL_REF_MASK GENMASK(29, 0)
+
+/*
+ * We usually have 1-2 refs taken, 128 is more than enough and we want to
+ * maximise the margin between this amount and the moment when it overflows.
+ */
+#define IO_POLL_REF_BIAS 128
#define IO_WQE_F_DOUBLE 1
@@ -58,6 +65,21 @@ static inline bool wqe_is_double(struct wait_queue_entry *wqe)
return priv & IO_WQE_F_DOUBLE;
}
+static bool io_poll_get_ownership_slowpath(struct io_kiocb *req)
+{
+ int v;
+
+ /*
+ * poll_refs are already elevated and we don't have much hope for
+ * grabbing the ownership. Instead of incrementing set a retry flag
+ * to notify the loop that there might have been some change.
+ */
+ v = atomic_fetch_or(IO_POLL_RETRY_FLAG, &req->poll_refs);
+ if (v & IO_POLL_REF_MASK)
+ return false;
+ return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
+}
+
/*
* If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can
* bump it and acquire ownership. It's disallowed to modify requests while not
@@ -66,6 +88,8 @@ static inline bool wqe_is_double(struct wait_queue_entry *wqe)
*/
static inline bool io_poll_get_ownership(struct io_kiocb *req)
{
+ if (unlikely(atomic_read(&req->poll_refs) >= IO_POLL_REF_BIAS))
+ return io_poll_get_ownership_slowpath(req);
return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
}
@@ -116,6 +140,8 @@ static void io_poll_req_insert_locked(struct io_kiocb *req)
struct io_hash_table *table = &req->ctx->cancel_table_locked;
u32 index = hash_long(req->cqe.user_data, table->hash_bits);
+ lockdep_assert_held(&req->ctx->uring_lock);
+
hlist_add_head(&req->hash_node, &table->hbs[index].list);
}
@@ -211,7 +237,6 @@ enum {
*/
static int io_poll_check_events(struct io_kiocb *req, bool *locked)
{
- struct io_ring_ctx *ctx = req->ctx;
int v, ret;
/* req->task == current here, checking PF_EXITING is safe */
@@ -221,11 +246,31 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
do {
v = atomic_read(&req->poll_refs);
- /* tw handler should be the owner, and so have some references */
- if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
- return IOU_POLL_DONE;
- if (v & IO_POLL_CANCEL_FLAG)
- return -ECANCELED;
+ if (unlikely(v != 1)) {
+ /* tw should be the owner and so have some refs */
+ if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
+ return IOU_POLL_NO_ACTION;
+ if (v & IO_POLL_CANCEL_FLAG)
+ return -ECANCELED;
+ /*
+ * cqe.res contains only events of the first wake up
+ * and all others are to be lost. Redo vfs_poll() to get
+ * up to date state.
+ */
+ if ((v & IO_POLL_REF_MASK) != 1)
+ req->cqe.res = 0;
+
+ if (v & IO_POLL_RETRY_FLAG) {
+ req->cqe.res = 0;
+ /*
+ * We won't find new events that came in between
+ * vfs_poll and the ref put unless we clear the
+ * flag in advance.
+ */
+ atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs);
+ v &= ~IO_POLL_RETRY_FLAG;
+ }
+ }
/* the mask was stashed in __io_poll_execute */
if (!req->cqe.res) {
@@ -243,8 +288,8 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
__poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events);
- if (!io_post_aux_cqe(ctx, req->cqe.user_data,
- mask, IORING_CQE_F_MORE, false)) {
+ if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data,
+ mask, IORING_CQE_F_MORE, false)) {
io_req_set_res(req, mask, 0);
return IOU_POLL_REMOVE_POLL_USE_RES;
}
@@ -256,11 +301,15 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
return ret;
}
+ /* force the next iteration to vfs_poll() */
+ req->cqe.res = 0;
+
/*
* Release all references, retry if someone tried to restart
* task_work while we were executing it.
*/
- } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs));
+ } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs) &
+ IO_POLL_REF_MASK);
return IOU_POLL_NO_ACTION;
}
@@ -272,54 +321,38 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
ret = io_poll_check_events(req, locked);
if (ret == IOU_POLL_NO_ACTION)
return;
-
- if (ret == IOU_POLL_DONE) {
- struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll);
- req->cqe.res = mangle_poll(req->cqe.res & poll->events);
- } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
- req->cqe.res = ret;
- req_set_fail(req);
- }
-
io_poll_remove_entries(req);
io_poll_tw_hash_eject(req, locked);
- io_req_set_res(req, req->cqe.res, 0);
- io_req_task_complete(req, locked);
-}
-
-static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
-{
- int ret;
-
- ret = io_poll_check_events(req, locked);
- if (ret == IOU_POLL_NO_ACTION)
- return;
+ if (req->opcode == IORING_OP_POLL_ADD) {
+ if (ret == IOU_POLL_DONE) {
+ struct io_poll *poll;
- io_poll_remove_entries(req);
- io_poll_tw_hash_eject(req, locked);
+ poll = io_kiocb_to_cmd(req, struct io_poll);
+ req->cqe.res = mangle_poll(req->cqe.res & poll->events);
+ } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
+ req->cqe.res = ret;
+ req_set_fail(req);
+ }
- if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
- io_req_complete_post(req);
- else if (ret == IOU_POLL_DONE)
- io_req_task_submit(req, locked);
- else
- io_req_complete_failed(req, ret);
+ io_req_set_res(req, req->cqe.res, 0);
+ io_req_task_complete(req, locked);
+ } else {
+ io_tw_lock(req->ctx, locked);
+
+ if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
+ io_req_task_complete(req, locked);
+ else if (ret == IOU_POLL_DONE)
+ io_req_task_submit(req, locked);
+ else
+ io_req_defer_failed(req, ret);
+ }
}
static void __io_poll_execute(struct io_kiocb *req, int mask)
{
io_req_set_res(req, mask, 0);
- /*
- * This is useful for poll that is armed on behalf of another
- * request, and where the wakeup path could be on a different
- * CPU. We want to avoid pulling in req->apoll->events for that
- * case.
- */
- if (req->opcode == IORING_OP_POLL_ADD)
- req->io_task_work.func = io_poll_task_func;
- else
- req->io_task_work.func = io_apoll_task_func;
+ req->io_task_work.func = io_poll_task_func;
trace_io_uring_task_add(req, mask);
io_req_task_work_add(req);
@@ -380,6 +413,14 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
return 0;
if (io_poll_get_ownership(req)) {
+ /*
+ * If we trigger a multishot poll off our own wakeup path,
+ * disable multishot as there is a circular dependency between
+ * CQ posting and triggering the event.
+ */
+ if (mask & EPOLL_URING_WAKE)
+ poll->events |= EPOLLONESHOT;
+
/* optional, saves extra locking for removal in tw handler */
if (mask && poll->events & EPOLLONESHOT) {
list_del_init(&poll->wait.entry);
@@ -394,7 +435,8 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
return 1;
}
-static void io_poll_double_prepare(struct io_kiocb *req)
+/* fails only when polling is already completing by the first entry */
+static bool io_poll_double_prepare(struct io_kiocb *req)
{
struct wait_queue_head *head;
struct io_poll *poll = io_poll_get_single(req);
@@ -403,20 +445,20 @@ static void io_poll_double_prepare(struct io_kiocb *req)
rcu_read_lock();
head = smp_load_acquire(&poll->head);
/*
- * poll arm may not hold ownership and so race with
- * io_poll_wake() by modifying req->flags. There is only one
- * poll entry queued, serialise with it by taking its head lock.
+ * poll arm might not hold ownership and so race for req->flags with
+ * io_poll_wake(). There is only one poll entry queued, serialise with
+ * it by taking its head lock. As we're still arming the tw hanlder
+ * is not going to be run, so there are no races with it.
*/
- if (head)
+ if (head) {
spin_lock_irq(&head->lock);
-
- req->flags |= REQ_F_DOUBLE_POLL;
- if (req->opcode == IORING_OP_POLL_ADD)
- req->flags |= REQ_F_ASYNC_DATA;
-
- if (head)
+ req->flags |= REQ_F_DOUBLE_POLL;
+ if (req->opcode == IORING_OP_POLL_ADD)
+ req->flags |= REQ_F_ASYNC_DATA;
spin_unlock_irq(&head->lock);
+ }
rcu_read_unlock();
+ return !!head;
}
static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
@@ -454,7 +496,11 @@ static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
/* mark as double wq entry */
wqe_private |= IO_WQE_F_DOUBLE;
io_init_poll_iocb(poll, first->events, first->wait.func);
- io_poll_double_prepare(req);
+ if (!io_poll_double_prepare(req)) {
+ /* the request is completing, just back off */
+ kfree(poll);
+ return;
+ }
*poll_ptr = poll;
} else {
/* fine to modify, there is no poll queued to race with us */
@@ -499,7 +545,6 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
unsigned issue_flags)
{
struct io_ring_ctx *ctx = req->ctx;
- int v;
INIT_HLIST_NODE(&req->hash_node);
req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
@@ -567,11 +612,10 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
if (ipt->owning) {
/*
- * Release ownership. If someone tried to queue a tw while it was
- * locked, kick it off for them.
+ * Try to release ownership. If we see a change of state, e.g.
+ * poll was waken up, queue up a tw, it'll deal with it.
*/
- v = atomic_dec_return(&req->poll_refs);
- if (unlikely(v & IO_POLL_REF_MASK))
+ if (atomic_cmpxchg(&req->poll_refs, 1, 0) != 1)
__io_poll_execute(req, 0);
}
return 0;
@@ -596,10 +640,13 @@ static struct async_poll *io_req_alloc_apoll(struct io_kiocb *req,
if (req->flags & REQ_F_POLLED) {
apoll = req->apoll;
kfree(apoll->double_poll);
- } else if (!(issue_flags & IO_URING_F_UNLOCKED) &&
- (entry = io_alloc_cache_get(&ctx->apoll_cache)) != NULL) {
+ } else if (!(issue_flags & IO_URING_F_UNLOCKED)) {
+ entry = io_alloc_cache_get(&ctx->apoll_cache);
+ if (entry == NULL)
+ goto alloc_apoll;
apoll = container_of(entry, struct async_poll, cache);
} else {
+alloc_apoll:
apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
if (unlikely(!apoll))
return NULL;