diff options
Diffstat (limited to 'fs/io_uring.c')
| -rw-r--r-- | fs/io_uring.c | 593 | 
1 files changed, 393 insertions, 200 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c index d542f1cf4428..0dadbdbead0f 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -75,7 +75,7 @@  #include "internal.h" -#define IORING_MAX_ENTRIES	4096 +#define IORING_MAX_ENTRIES	32768  #define IORING_MAX_FIXED_FILES	1024  struct io_uring { @@ -84,27 +84,29 @@ struct io_uring {  };  /* - * This data is shared with the application through the mmap at offset - * IORING_OFF_SQ_RING. + * This data is shared with the application through the mmap at offsets + * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.   *   * The offsets to the member fields are published through struct   * io_sqring_offsets when calling io_uring_setup.   */ -struct io_sq_ring { +struct io_rings {  	/*  	 * Head and tail offsets into the ring; the offsets need to be  	 * masked to get valid indices.  	 * -	 * The kernel controls head and the application controls tail. +	 * The kernel controls head of the sq ring and the tail of the cq ring, +	 * and the application controls tail of the sq ring and the head of the +	 * cq ring.  	 */ -	struct io_uring		r; +	struct io_uring		sq, cq;  	/* -	 * Bitmask to apply to head and tail offsets (constant, equals +	 * Bitmasks to apply to head and tail offsets (constant, equals  	 * ring_entries - 1)  	 */ -	u32			ring_mask; -	/* Ring size (constant, power of 2) */ -	u32			ring_entries; +	u32			sq_ring_mask, cq_ring_mask; +	/* Ring sizes (constant, power of 2) */ +	u32			sq_ring_entries, cq_ring_entries;  	/*  	 * Number of invalid entries dropped by the kernel due to  	 * invalid index stored in array @@ -117,7 +119,7 @@ struct io_sq_ring {  	 * counter includes all submissions that were dropped reaching  	 * the new SQ head (and possibly more).  	 */ -	u32			dropped; +	u32			sq_dropped;  	/*  	 * Runtime flags  	 * @@ -127,43 +129,7 @@ struct io_sq_ring {  	 * The application needs a full memory barrier before checking  	 * for IORING_SQ_NEED_WAKEUP after updating the sq tail.  	 */ -	u32			flags; -	/* -	 * Ring buffer of indices into array of io_uring_sqe, which is -	 * mmapped by the application using the IORING_OFF_SQES offset. -	 * -	 * This indirection could e.g. be used to assign fixed -	 * io_uring_sqe entries to operations and only submit them to -	 * the queue when needed. -	 * -	 * The kernel modifies neither the indices array nor the entries -	 * array. -	 */ -	u32			array[]; -}; - -/* - * This data is shared with the application through the mmap at offset - * IORING_OFF_CQ_RING. - * - * The offsets to the member fields are published through struct - * io_cqring_offsets when calling io_uring_setup. - */ -struct io_cq_ring { -	/* -	 * Head and tail offsets into the ring; the offsets need to be -	 * masked to get valid indices. -	 * -	 * The application controls head and the kernel tail. -	 */ -	struct io_uring		r; -	/* -	 * Bitmask to apply to head and tail offsets (constant, equals -	 * ring_entries - 1) -	 */ -	u32			ring_mask; -	/* Ring size (constant, power of 2) */ -	u32			ring_entries; +	u32			sq_flags;  	/*  	 * Number of completion events lost because the queue was full;  	 * this should be avoided by the application by making sure @@ -177,7 +143,7 @@ struct io_cq_ring {  	 * As completion events come in out of order this counter is not  	 * ordered with any other data.  	 */ -	u32			overflow; +	u32			cq_overflow;  	/*  	 * Ring buffer of completion events.  	 * @@ -185,7 +151,7 @@ struct io_cq_ring {  	 * produced, so the application is allowed to modify pending  	 * entries.  	 */ -	struct io_uring_cqe	cqes[]; +	struct io_uring_cqe	cqes[] ____cacheline_aligned_in_smp;  };  struct io_mapped_ubuf { @@ -201,7 +167,7 @@ struct async_list {  	struct list_head	list;  	struct file		*file; -	off_t			io_end; +	off_t			io_start;  	size_t			io_len;  }; @@ -215,8 +181,18 @@ struct io_ring_ctx {  		bool			compat;  		bool			account_mem; -		/* SQ ring */ -		struct io_sq_ring	*sq_ring; +		/* +		 * Ring buffer of indices into array of io_uring_sqe, which is +		 * mmapped by the application using the IORING_OFF_SQES offset. +		 * +		 * This indirection could e.g. be used to assign fixed +		 * io_uring_sqe entries to operations and only submit them to +		 * the queue when needed. +		 * +		 * The kernel modifies neither the indices array nor the entries +		 * array. +		 */ +		u32			*sq_array;  		unsigned		cached_sq_head;  		unsigned		sq_entries;  		unsigned		sq_mask; @@ -227,15 +203,13 @@ struct io_ring_ctx {  	} ____cacheline_aligned_in_smp;  	/* IO offload */ -	struct workqueue_struct	*sqo_wq; +	struct workqueue_struct	*sqo_wq[2];  	struct task_struct	*sqo_thread;	/* if using sq thread polling */  	struct mm_struct	*sqo_mm;  	wait_queue_head_t	sqo_wait;  	struct completion	sqo_thread_started;  	struct { -		/* CQ ring */ -		struct io_cq_ring	*cq_ring;  		unsigned		cached_cq_tail;  		unsigned		cq_entries;  		unsigned		cq_mask; @@ -244,6 +218,8 @@ struct io_ring_ctx {  		struct eventfd_ctx	*cq_ev_fd;  	} ____cacheline_aligned_in_smp; +	struct io_rings	*rings; +  	/*  	 * If used, fixed file set. Writers must ensure that ->refs is dead,  	 * readers must ensure that ->refs is alive as long as the file* is @@ -288,6 +264,7 @@ struct io_ring_ctx {  struct sqe_submit {  	const struct io_uring_sqe	*sqe;  	unsigned short			index; +	u32				sequence;  	bool				has_user;  	bool				needs_lock;  	bool				needs_fixed_file; @@ -335,6 +312,7 @@ struct io_kiocb {  #define REQ_F_LINK		64	/* linked sqes */  #define REQ_F_LINK_DONE		128	/* linked sqes done */  #define REQ_F_FAIL_LINK		256	/* fail rest of links */ +#define REQ_F_SHADOW_DRAIN	512	/* link-drain shadow req */  	u64			user_data;  	u32			result;  	u32			sequence; @@ -366,6 +344,7 @@ struct io_submit_state {  };  static void io_sq_wq_submit_work(struct work_struct *work); +static void __io_free_req(struct io_kiocb *req);  static struct kmem_cache *req_cachep; @@ -430,7 +409,7 @@ static inline bool io_sequence_defer(struct io_ring_ctx *ctx,  	if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)  		return false; -	return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped; +	return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped;  }  static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) @@ -451,11 +430,11 @@ static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)  static void __io_commit_cqring(struct io_ring_ctx *ctx)  { -	struct io_cq_ring *ring = ctx->cq_ring; +	struct io_rings *rings = ctx->rings; -	if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) { +	if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {  		/* order cqe stores with ring update */ -		smp_store_release(&ring->r.tail, ctx->cached_cq_tail); +		smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);  		if (wq_has_sleeper(&ctx->cq_wait)) {  			wake_up_interruptible(&ctx->cq_wait); @@ -464,6 +443,24 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)  	}  } +static inline void io_queue_async_work(struct io_ring_ctx *ctx, +				       struct io_kiocb *req) +{ +	int rw; + +	switch (req->submit.sqe->opcode) { +	case IORING_OP_WRITEV: +	case IORING_OP_WRITE_FIXED: +		rw = !(req->rw.ki_flags & IOCB_DIRECT); +		break; +	default: +		rw = 0; +		break; +	} + +	queue_work(ctx->sqo_wq[rw], &req->work); +} +  static void io_commit_cqring(struct io_ring_ctx *ctx)  {  	struct io_kiocb *req; @@ -471,14 +468,19 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)  	__io_commit_cqring(ctx);  	while ((req = io_get_deferred_req(ctx)) != NULL) { +		if (req->flags & REQ_F_SHADOW_DRAIN) { +			/* Just for drain, free it. */ +			__io_free_req(req); +			continue; +		}  		req->flags |= REQ_F_IO_DRAINED; -		queue_work(ctx->sqo_wq, &req->work); +		io_queue_async_work(ctx, req);  	}  }  static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)  { -	struct io_cq_ring *ring = ctx->cq_ring; +	struct io_rings *rings = ctx->rings;  	unsigned tail;  	tail = ctx->cached_cq_tail; @@ -487,11 +489,11 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)  	 * control dependency is enough as we're using WRITE_ONCE to  	 * fill the cq entry  	 */ -	if (tail - READ_ONCE(ring->r.head) == ring->ring_entries) +	if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)  		return NULL;  	ctx->cached_cq_tail++; -	return &ring->cqes[tail & ctx->cq_mask]; +	return &rings->cqes[tail & ctx->cq_mask];  }  static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, @@ -510,9 +512,9 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,  		WRITE_ONCE(cqe->res, res);  		WRITE_ONCE(cqe->flags, 0);  	} else { -		unsigned overflow = READ_ONCE(ctx->cq_ring->overflow); +		unsigned overflow = READ_ONCE(ctx->rings->cq_overflow); -		WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1); +		WRITE_ONCE(ctx->rings->cq_overflow, overflow + 1);  	}  } @@ -635,7 +637,7 @@ static void io_req_link_next(struct io_kiocb *req)  		nxt->flags |= REQ_F_LINK_DONE;  		INIT_WORK(&nxt->work, io_sq_wq_submit_work); -		queue_work(req->ctx->sqo_wq, &nxt->work); +		io_queue_async_work(req->ctx, nxt);  	}  } @@ -679,6 +681,13 @@ static void io_put_req(struct io_kiocb *req)  		io_free_req(req);  } +static unsigned io_cqring_events(struct io_rings *rings) +{ +	/* See comment at the top of this file */ +	smp_rmb(); +	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head); +} +  /*   * Find and free completed poll iocbs   */ @@ -771,7 +780,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,  static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,  				long min)  { -	while (!list_empty(&ctx->poll_list)) { +	while (!list_empty(&ctx->poll_list) && !need_resched()) {  		int ret;  		ret = io_do_iopoll(ctx, nr_events, min); @@ -798,6 +807,12 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)  		unsigned int nr_events = 0;  		io_iopoll_getevents(ctx, &nr_events, 1); + +		/* +		 * Ensure we allow local-to-the-cpu processing to take place, +		 * in this case we need to ensure that we reap all events. +		 */ +		cond_resched();  	}  	mutex_unlock(&ctx->uring_lock);  } @@ -805,11 +820,42 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)  static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,  			   long min)  { -	int ret = 0; +	int iters, ret = 0; +	/* +	 * We disallow the app entering submit/complete with polling, but we +	 * still need to lock the ring to prevent racing with polled issue +	 * that got punted to a workqueue. +	 */ +	mutex_lock(&ctx->uring_lock); + +	iters = 0;  	do {  		int tmin = 0; +		/* +		 * Don't enter poll loop if we already have events pending. +		 * If we do, we can potentially be spinning for commands that +		 * already triggered a CQE (eg in error). +		 */ +		if (io_cqring_events(ctx->rings)) +			break; + +		/* +		 * If a submit got punted to a workqueue, we can have the +		 * application entering polling for a command before it gets +		 * issued. That app will hold the uring_lock for the duration +		 * of the poll right here, so we need to take a breather every +		 * now and then to ensure that the issue has a chance to add +		 * the poll to the issued list. Otherwise we can spin here +		 * forever, while the workqueue is stuck trying to acquire the +		 * very same mutex. +		 */ +		if (!(++iters & 7)) { +			mutex_unlock(&ctx->uring_lock); +			mutex_lock(&ctx->uring_lock); +		} +  		if (*nr_events < min)  			tmin = min - *nr_events; @@ -819,6 +865,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,  		ret = 0;  	} while (min && !*nr_events && !need_resched()); +	mutex_unlock(&ctx->uring_lock);  	return ret;  } @@ -1097,10 +1144,8 @@ static int io_import_fixed(struct io_ring_ctx *ctx, int rw,  			iter->bvec = bvec + seg_skip;  			iter->nr_segs -= seg_skip; -			iter->count -= (seg_skip << PAGE_SHIFT); +			iter->count -= bvec->bv_len + offset;  			iter->iov_offset = offset & ~PAGE_MASK; -			if (iter->iov_offset) -				iter->count -= iter->iov_offset;  		}  	} @@ -1144,6 +1189,28 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,  	return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);  } +static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb) +{ +	if (al->file == kiocb->ki_filp) { +		off_t start, end; + +		/* +		 * Allow merging if we're anywhere in the range of the same +		 * page. Generally this happens for sub-page reads or writes, +		 * and it's beneficial to allow the first worker to bring the +		 * page in and the piggy backed work can then work on the +		 * cached page. +		 */ +		start = al->io_start & PAGE_MASK; +		end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK; +		if (kiocb->ki_pos >= start && kiocb->ki_pos <= end) +			return true; +	} + +	al->file = NULL; +	return false; +} +  /*   * Make a note of the last file/offset/direction we punted to async   * context. We'll use this information to see if we can piggy back a @@ -1155,9 +1222,8 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)  	struct async_list *async_list = &req->ctx->pending_async[rw];  	struct kiocb *kiocb = &req->rw;  	struct file *filp = kiocb->ki_filp; -	off_t io_end = kiocb->ki_pos + len; -	if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) { +	if (io_should_merge(async_list, kiocb)) {  		unsigned long max_bytes;  		/* Use 8x RA size as a decent limiter for both reads/writes */ @@ -1170,17 +1236,16 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)  			req->flags |= REQ_F_SEQ_PREV;  			async_list->io_len += len;  		} else { -			io_end = 0; -			async_list->io_len = 0; +			async_list->file = NULL;  		}  	}  	/* New file? Reset state. */  	if (async_list->file != filp) { -		async_list->io_len = 0; +		async_list->io_start = kiocb->ki_pos; +		async_list->io_len = len;  		async_list->file = filp;  	} -	async_list->io_end = io_end;  }  static int io_read(struct io_kiocb *req, const struct sqe_submit *s, @@ -1492,7 +1557,7 @@ static void io_poll_remove_one(struct io_kiocb *req)  	WRITE_ONCE(poll->canceled, true);  	if (!list_empty(&poll->wait.entry)) {  		list_del_init(&poll->wait.entry); -		queue_work(req->ctx->sqo_wq, &req->work); +		io_queue_async_work(req->ctx, req);  	}  	spin_unlock(&poll->head->lock); @@ -1606,7 +1671,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,  		io_cqring_ev_posted(ctx);  		io_put_req(req);  	} else { -		queue_work(ctx->sqo_wq, &req->work); +		io_queue_async_work(ctx, req);  	}  	return 1; @@ -1949,7 +2014,7 @@ out:   */  static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)  { -	bool ret = false; +	bool ret;  	if (!list)  		return false; @@ -1995,10 +2060,14 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,  	flags = READ_ONCE(s->sqe->flags);  	fd = READ_ONCE(s->sqe->fd); -	if (flags & IOSQE_IO_DRAIN) { +	if (flags & IOSQE_IO_DRAIN)  		req->flags |= REQ_F_IO_DRAIN; -		req->sequence = ctx->cached_sq_head - 1; -	} +	/* +	 * All io need record the previous position, if LINK vs DARIN, +	 * it can be used to mark the position of the first IO in the +	 * link list. +	 */ +	req->sequence = s->sequence;  	if (!io_op_needs_file(s->sqe))  		return 0; @@ -2020,12 +2089,12 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,  	return 0;  } -static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, -			struct sqe_submit *s) +static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, +			struct sqe_submit *s, bool force_nonblock)  {  	int ret; -	ret = __io_submit_sqe(ctx, req, s, true); +	ret = __io_submit_sqe(ctx, req, s, force_nonblock);  	if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {  		struct io_uring_sqe *sqe_copy; @@ -2042,7 +2111,7 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,  				if (list)  					atomic_inc(&list->cnt);  				INIT_WORK(&req->work, io_sq_wq_submit_work); -				queue_work(ctx->sqo_wq, &req->work); +				io_queue_async_work(ctx, req);  			}  			/* @@ -2067,10 +2136,70 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,  	return ret;  } +static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, +			struct sqe_submit *s, bool force_nonblock) +{ +	int ret; + +	ret = io_req_defer(ctx, req, s->sqe); +	if (ret) { +		if (ret != -EIOCBQUEUED) { +			io_free_req(req); +			io_cqring_add_event(ctx, s->sqe->user_data, ret); +		} +		return 0; +	} + +	return __io_queue_sqe(ctx, req, s, force_nonblock); +} + +static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, +			      struct sqe_submit *s, struct io_kiocb *shadow, +			      bool force_nonblock) +{ +	int ret; +	int need_submit = false; + +	if (!shadow) +		return io_queue_sqe(ctx, req, s, force_nonblock); + +	/* +	 * Mark the first IO in link list as DRAIN, let all the following +	 * IOs enter the defer list. all IO needs to be completed before link +	 * list. +	 */ +	req->flags |= REQ_F_IO_DRAIN; +	ret = io_req_defer(ctx, req, s->sqe); +	if (ret) { +		if (ret != -EIOCBQUEUED) { +			io_free_req(req); +			io_cqring_add_event(ctx, s->sqe->user_data, ret); +			return 0; +		} +	} else { +		/* +		 * If ret == 0 means that all IOs in front of link io are +		 * running done. let's queue link head. +		 */ +		need_submit = true; +	} + +	/* Insert shadow req to defer_list, blocking next IOs */ +	spin_lock_irq(&ctx->completion_lock); +	list_add_tail(&shadow->list, &ctx->defer_list); +	spin_unlock_irq(&ctx->completion_lock); + +	if (need_submit) +		return __io_queue_sqe(ctx, req, s, force_nonblock); + +	return 0; +} +  #define SQE_VALID_FLAGS	(IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)  static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, -			  struct io_submit_state *state, struct io_kiocb **link) +			  struct io_submit_state *state, struct io_kiocb **link, +			  bool force_nonblock)  {  	struct io_uring_sqe *sqe_copy;  	struct io_kiocb *req; @@ -2097,13 +2226,6 @@ err:  		return;  	} -	ret = io_req_defer(ctx, req, s->sqe); -	if (ret) { -		if (ret != -EIOCBQUEUED) -			goto err_req; -		return; -	} -  	/*  	 * If we already have a head request, queue this one for async  	 * submittal once the head completes. If we don't have a head but @@ -2130,7 +2252,7 @@ err:  		INIT_LIST_HEAD(&req->link_list);  		*link = req;  	} else { -		io_queue_sqe(ctx, req, s); +		io_queue_sqe(ctx, req, s, force_nonblock);  	}  } @@ -2160,15 +2282,15 @@ static void io_submit_state_start(struct io_submit_state *state,  static void io_commit_sqring(struct io_ring_ctx *ctx)  { -	struct io_sq_ring *ring = ctx->sq_ring; +	struct io_rings *rings = ctx->rings; -	if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) { +	if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {  		/*  		 * Ensure any loads from the SQEs are done at this point,  		 * since once we write the new head, the application could  		 * write new data to them.  		 */ -		smp_store_release(&ring->r.head, ctx->cached_sq_head); +		smp_store_release(&rings->sq.head, ctx->cached_sq_head);  	}  } @@ -2182,7 +2304,8 @@ static void io_commit_sqring(struct io_ring_ctx *ctx)   */  static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)  { -	struct io_sq_ring *ring = ctx->sq_ring; +	struct io_rings *rings = ctx->rings; +	u32 *sq_array = ctx->sq_array;  	unsigned head;  	/* @@ -2195,20 +2318,21 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)  	 */  	head = ctx->cached_sq_head;  	/* make sure SQ entry isn't read before tail */ -	if (head == smp_load_acquire(&ring->r.tail)) +	if (head == smp_load_acquire(&rings->sq.tail))  		return false; -	head = READ_ONCE(ring->array[head & ctx->sq_mask]); +	head = READ_ONCE(sq_array[head & ctx->sq_mask]);  	if (head < ctx->sq_entries) {  		s->index = head;  		s->sqe = &ctx->sq_sqes[head]; +		s->sequence = ctx->cached_sq_head;  		ctx->cached_sq_head++;  		return true;  	}  	/* drop invalid entries */  	ctx->cached_sq_head++; -	ring->dropped++; +	rings->sq_dropped++;  	return false;  } @@ -2217,6 +2341,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,  {  	struct io_submit_state state, *statep = NULL;  	struct io_kiocb *link = NULL; +	struct io_kiocb *shadow_req = NULL;  	bool prev_was_link = false;  	int i, submitted = 0; @@ -2231,11 +2356,21 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,  		 * that's the end of the chain. Submit the previous link.  		 */  		if (!prev_was_link && link) { -			io_queue_sqe(ctx, link, &link->submit); +			io_queue_link_head(ctx, link, &link->submit, shadow_req, +						true);  			link = NULL;  		}  		prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0; +		if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) { +			if (!shadow_req) { +				shadow_req = io_get_req(ctx, NULL); +				shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); +				refcount_dec(&shadow_req->refs); +			} +			shadow_req->sequence = sqes[i].sequence; +		} +  		if (unlikely(mm_fault)) {  			io_cqring_add_event(ctx, sqes[i].sqe->user_data,  						-EFAULT); @@ -2243,13 +2378,13 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,  			sqes[i].has_user = has_user;  			sqes[i].needs_lock = true;  			sqes[i].needs_fixed_file = true; -			io_submit_sqe(ctx, &sqes[i], statep, &link); +			io_submit_sqe(ctx, &sqes[i], statep, &link, true);  			submitted++;  		}  	}  	if (link) -		io_queue_sqe(ctx, link, &link->submit); +		io_queue_link_head(ctx, link, &link->submit, shadow_req, true);  	if (statep)  		io_submit_state_end(&state); @@ -2280,15 +2415,7 @@ static int io_sq_thread(void *data)  			unsigned nr_events = 0;  			if (ctx->flags & IORING_SETUP_IOPOLL) { -				/* -				 * We disallow the app entering submit/complete -				 * with polling, but we still need to lock the -				 * ring to prevent racing with polled issue -				 * that got punted to a workqueue. -				 */ -				mutex_lock(&ctx->uring_lock);  				io_iopoll_check(ctx, &nr_events, 0); -				mutex_unlock(&ctx->uring_lock);  			} else {  				/*  				 * Normal IO, just pretend everything completed. @@ -2329,7 +2456,7 @@ static int io_sq_thread(void *data)  						TASK_INTERRUPTIBLE);  			/* Tell userspace we may need a wakeup call */ -			ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP; +			ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;  			/* make sure to read SQ tail after writing flags */  			smp_mb(); @@ -2343,12 +2470,12 @@ static int io_sq_thread(void *data)  				schedule();  				finish_wait(&ctx->sqo_wait, &wait); -				ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; +				ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;  				continue;  			}  			finish_wait(&ctx->sqo_wait, &wait); -			ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; +			ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;  		}  		i = 0; @@ -2389,10 +2516,12 @@ static int io_sq_thread(void *data)  	return 0;  } -static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) +static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit, +			  bool block_for_last)  {  	struct io_submit_state state, *statep = NULL;  	struct io_kiocb *link = NULL; +	struct io_kiocb *shadow_req = NULL;  	bool prev_was_link = false;  	int i, submit = 0; @@ -2402,6 +2531,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)  	}  	for (i = 0; i < to_submit; i++) { +		bool force_nonblock = true;  		struct sqe_submit s;  		if (!io_get_sqring(ctx, &s)) @@ -2412,34 +2542,49 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)  		 * that's the end of the chain. Submit the previous link.  		 */  		if (!prev_was_link && link) { -			io_queue_sqe(ctx, link, &link->submit); +			io_queue_link_head(ctx, link, &link->submit, shadow_req, +						force_nonblock);  			link = NULL;  		}  		prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0; +		if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) { +			if (!shadow_req) { +				shadow_req = io_get_req(ctx, NULL); +				shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN); +				refcount_dec(&shadow_req->refs); +			} +			shadow_req->sequence = s.sequence; +		} +  		s.has_user = true;  		s.needs_lock = false;  		s.needs_fixed_file = false;  		submit++; -		io_submit_sqe(ctx, &s, statep, &link); + +		/* +		 * The caller will block for events after submit, submit the +		 * last IO non-blocking. This is either the only IO it's +		 * submitting, or it already submitted the previous ones. This +		 * improves performance by avoiding an async punt that we don't +		 * need to do. +		 */ +		if (block_for_last && submit == to_submit) +			force_nonblock = false; + +		io_submit_sqe(ctx, &s, statep, &link, force_nonblock);  	}  	io_commit_sqring(ctx);  	if (link) -		io_queue_sqe(ctx, link, &link->submit); +		io_queue_link_head(ctx, link, &link->submit, shadow_req, +					block_for_last);  	if (statep)  		io_submit_state_end(statep);  	return submit;  } -static unsigned io_cqring_events(struct io_cq_ring *ring) -{ -	/* See comment at the top of this file */ -	smp_rmb(); -	return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head); -} -  /*   * Wait until events become available, if we don't already have some. The   * application must reap them itself, as they reside on the shared cq ring. @@ -2447,10 +2592,10 @@ static unsigned io_cqring_events(struct io_cq_ring *ring)  static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,  			  const sigset_t __user *sig, size_t sigsz)  { -	struct io_cq_ring *ring = ctx->cq_ring; +	struct io_rings *rings = ctx->rings;  	int ret; -	if (io_cqring_events(ring) >= min_events) +	if (io_cqring_events(rings) >= min_events)  		return 0;  	if (sig) { @@ -2466,12 +2611,12 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,  			return ret;  	} -	ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events); +	ret = wait_event_interruptible(ctx->wait, io_cqring_events(rings) >= min_events);  	restore_saved_sigmask_unless(ret == -ERESTARTSYS);  	if (ret == -ERESTARTSYS)  		ret = -EINTR; -	return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0; +	return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;  }  static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) @@ -2521,11 +2666,15 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)  static void io_finish_async(struct io_ring_ctx *ctx)  { +	int i; +  	io_sq_thread_stop(ctx); -	if (ctx->sqo_wq) { -		destroy_workqueue(ctx->sqo_wq); -		ctx->sqo_wq = NULL; +	for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) { +		if (ctx->sqo_wq[i]) { +			destroy_workqueue(ctx->sqo_wq[i]); +			ctx->sqo_wq[i] = NULL; +		}  	}  } @@ -2733,16 +2882,31 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,  	}  	/* Do QD, or 2 * CPUS, whatever is smallest */ -	ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE, +	ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq", +			WQ_UNBOUND | WQ_FREEZABLE,  			min(ctx->sq_entries - 1, 2 * num_online_cpus())); -	if (!ctx->sqo_wq) { +	if (!ctx->sqo_wq[0]) { +		ret = -ENOMEM; +		goto err; +	} + +	/* +	 * This is for buffered writes, where we want to limit the parallelism +	 * due to file locking in file systems. As "normal" buffered writes +	 * should parellelize on writeout quite nicely, limit us to having 2 +	 * pending. This avoids massive contention on the inode when doing +	 * buffered async writes. +	 */ +	ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq", +						WQ_UNBOUND | WQ_FREEZABLE, 2); +	if (!ctx->sqo_wq[1]) {  		ret = -ENOMEM;  		goto err;  	}  	return 0;  err: -	io_sq_thread_stop(ctx); +	io_finish_async(ctx);  	mmdrop(ctx->sqo_mm);  	ctx->sqo_mm = NULL;  	return ret; @@ -2791,17 +2955,45 @@ static void *io_mem_alloc(size_t size)  	return (void *) __get_free_pages(gfp_flags, get_order(size));  } +static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries, +				size_t *sq_offset) +{ +	struct io_rings *rings; +	size_t off, sq_array_size; + +	off = struct_size(rings, cqes, cq_entries); +	if (off == SIZE_MAX) +		return SIZE_MAX; + +#ifdef CONFIG_SMP +	off = ALIGN(off, SMP_CACHE_BYTES); +	if (off == 0) +		return SIZE_MAX; +#endif + +	sq_array_size = array_size(sizeof(u32), sq_entries); +	if (sq_array_size == SIZE_MAX) +		return SIZE_MAX; + +	if (check_add_overflow(off, sq_array_size, &off)) +		return SIZE_MAX; + +	if (sq_offset) +		*sq_offset = off; + +	return off; +} +  static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)  { -	struct io_sq_ring *sq_ring; -	struct io_cq_ring *cq_ring; -	size_t bytes; +	size_t pages; -	bytes = struct_size(sq_ring, array, sq_entries); -	bytes += array_size(sizeof(struct io_uring_sqe), sq_entries); -	bytes += struct_size(cq_ring, cqes, cq_entries); +	pages = (size_t)1 << get_order( +		rings_size(sq_entries, cq_entries, NULL)); +	pages += (size_t)1 << get_order( +		array_size(sizeof(struct io_uring_sqe), sq_entries)); -	return (bytes + PAGE_SIZE - 1) / PAGE_SIZE; +	return pages;  }  static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) @@ -2815,7 +3007,7 @@ static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)  		struct io_mapped_ubuf *imu = &ctx->user_bufs[i];  		for (j = 0; j < imu->nr_bvecs; j++) -			put_page(imu->bvec[j].bv_page); +			put_user_page(imu->bvec[j].bv_page);  		if (ctx->account_mem)  			io_unaccount_mem(ctx->user, imu->nr_bvecs); @@ -2959,10 +3151,8 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,  			 * if we did partial map, or found file backed vmas,  			 * release any pages we did get  			 */ -			if (pret > 0) { -				for (j = 0; j < pret; j++) -					put_page(pages[j]); -			} +			if (pret > 0) +				put_user_pages(pages, pret);  			if (ctx->account_mem)  				io_unaccount_mem(ctx->user, nr_pages);  			kvfree(imu->bvec); @@ -3048,9 +3238,8 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)  	}  #endif -	io_mem_free(ctx->sq_ring); +	io_mem_free(ctx->rings);  	io_mem_free(ctx->sq_sqes); -	io_mem_free(ctx->cq_ring);  	percpu_ref_exit(&ctx->refs);  	if (ctx->account_mem) @@ -3071,10 +3260,10 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)  	 * io_commit_cqring  	 */  	smp_rmb(); -	if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head != -	    ctx->sq_ring->ring_entries) +	if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head != +	    ctx->rings->sq_ring_entries)  		mask |= EPOLLOUT | EPOLLWRNORM; -	if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail) +	if (READ_ONCE(ctx->rings->sq.head) != ctx->cached_cq_tail)  		mask |= EPOLLIN | EPOLLRDNORM;  	return mask; @@ -3119,14 +3308,12 @@ static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)  	switch (offset) {  	case IORING_OFF_SQ_RING: -		ptr = ctx->sq_ring; +	case IORING_OFF_CQ_RING: +		ptr = ctx->rings;  		break;  	case IORING_OFF_SQES:  		ptr = ctx->sq_sqes;  		break; -	case IORING_OFF_CQ_RING: -		ptr = ctx->cq_ring; -		break;  	default:  		return -EINVAL;  	} @@ -3169,19 +3356,27 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,  	 * Just return the requested submit count, and wake the thread if  	 * we were asked to.  	 */ +	ret = 0;  	if (ctx->flags & IORING_SETUP_SQPOLL) {  		if (flags & IORING_ENTER_SQ_WAKEUP)  			wake_up(&ctx->sqo_wait);  		submitted = to_submit; -		goto out_ctx; -	} +	} else if (to_submit) { +		bool block_for_last = false; -	ret = 0; -	if (to_submit) {  		to_submit = min(to_submit, ctx->sq_entries); +		/* +		 * Allow last submission to block in a series, IFF the caller +		 * asked to wait for events and we don't currently have +		 * enough. This potentially avoids an async punt. +		 */ +		if (to_submit == min_complete && +		    io_cqring_events(ctx->rings) < min_complete) +			block_for_last = true; +  		mutex_lock(&ctx->uring_lock); -		submitted = io_ring_submit(ctx, to_submit); +		submitted = io_ring_submit(ctx, to_submit, block_for_last);  		mutex_unlock(&ctx->uring_lock);  	}  	if (flags & IORING_ENTER_GETEVENTS) { @@ -3190,15 +3385,12 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,  		min_complete = min(min_complete, ctx->cq_entries);  		if (ctx->flags & IORING_SETUP_IOPOLL) { -			mutex_lock(&ctx->uring_lock);  			ret = io_iopoll_check(ctx, &nr_events, min_complete); -			mutex_unlock(&ctx->uring_lock);  		} else {  			ret = io_cqring_wait(ctx, min_complete, sig, sigsz);  		}  	} -out_ctx:  	io_ring_drop_ctx_refs(ctx, 1);  out_fput:  	fdput(f); @@ -3215,19 +3407,27 @@ static const struct file_operations io_uring_fops = {  static int io_allocate_scq_urings(struct io_ring_ctx *ctx,  				  struct io_uring_params *p)  { -	struct io_sq_ring *sq_ring; -	struct io_cq_ring *cq_ring; -	size_t size; +	struct io_rings *rings; +	size_t size, sq_array_offset; -	sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries)); -	if (!sq_ring) +	size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset); +	if (size == SIZE_MAX) +		return -EOVERFLOW; + +	rings = io_mem_alloc(size); +	if (!rings)  		return -ENOMEM; -	ctx->sq_ring = sq_ring; -	sq_ring->ring_mask = p->sq_entries - 1; -	sq_ring->ring_entries = p->sq_entries; -	ctx->sq_mask = sq_ring->ring_mask; -	ctx->sq_entries = sq_ring->ring_entries; +	ctx->rings = rings; +	ctx->sq_array = (u32 *)((char *)rings + sq_array_offset); +	rings->sq_ring_mask = p->sq_entries - 1; +	rings->cq_ring_mask = p->cq_entries - 1; +	rings->sq_ring_entries = p->sq_entries; +	rings->cq_ring_entries = p->cq_entries; +	ctx->sq_mask = rings->sq_ring_mask; +	ctx->cq_mask = rings->cq_ring_mask; +	ctx->sq_entries = rings->sq_ring_entries; +	ctx->cq_entries = rings->cq_ring_entries;  	size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);  	if (size == SIZE_MAX) @@ -3237,15 +3437,6 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,  	if (!ctx->sq_sqes)  		return -ENOMEM; -	cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries)); -	if (!cq_ring) -		return -ENOMEM; - -	ctx->cq_ring = cq_ring; -	cq_ring->ring_mask = p->cq_entries - 1; -	cq_ring->ring_entries = p->cq_entries; -	ctx->cq_mask = cq_ring->ring_mask; -	ctx->cq_entries = cq_ring->ring_entries;  	return 0;  } @@ -3349,21 +3540,23 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)  		goto err;  	memset(&p->sq_off, 0, sizeof(p->sq_off)); -	p->sq_off.head = offsetof(struct io_sq_ring, r.head); -	p->sq_off.tail = offsetof(struct io_sq_ring, r.tail); -	p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask); -	p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries); -	p->sq_off.flags = offsetof(struct io_sq_ring, flags); -	p->sq_off.dropped = offsetof(struct io_sq_ring, dropped); -	p->sq_off.array = offsetof(struct io_sq_ring, array); +	p->sq_off.head = offsetof(struct io_rings, sq.head); +	p->sq_off.tail = offsetof(struct io_rings, sq.tail); +	p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask); +	p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries); +	p->sq_off.flags = offsetof(struct io_rings, sq_flags); +	p->sq_off.dropped = offsetof(struct io_rings, sq_dropped); +	p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;  	memset(&p->cq_off, 0, sizeof(p->cq_off)); -	p->cq_off.head = offsetof(struct io_cq_ring, r.head); -	p->cq_off.tail = offsetof(struct io_cq_ring, r.tail); -	p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask); -	p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries); -	p->cq_off.overflow = offsetof(struct io_cq_ring, overflow); -	p->cq_off.cqes = offsetof(struct io_cq_ring, cqes); +	p->cq_off.head = offsetof(struct io_rings, cq.head); +	p->cq_off.tail = offsetof(struct io_rings, cq.tail); +	p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask); +	p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries); +	p->cq_off.overflow = offsetof(struct io_rings, cq_overflow); +	p->cq_off.cqes = offsetof(struct io_rings, cqes); + +	p->features = IORING_FEAT_SINGLE_MMAP;  	return ret;  err:  	io_ring_ctx_wait_and_kill(ctx);  |