diff options
Diffstat (limited to 'fs/xfs/xfs_buf.c')
| -rw-r--r-- | fs/xfs/xfs_buf.c | 268 | 
1 files changed, 182 insertions, 86 deletions
diff --git a/fs/xfs/xfs_buf.c b/fs/xfs/xfs_buf.c index e71cfbd5acb3..47a318ce82e0 100644 --- a/fs/xfs/xfs_buf.c +++ b/fs/xfs/xfs_buf.c @@ -80,6 +80,47 @@ xfs_buf_vmap_len(  }  /* + * Bump the I/O in flight count on the buftarg if we haven't yet done so for + * this buffer. The count is incremented once per buffer (per hold cycle) + * because the corresponding decrement is deferred to buffer release. Buffers + * can undergo I/O multiple times in a hold-release cycle and per buffer I/O + * tracking adds unnecessary overhead. This is used for sychronization purposes + * with unmount (see xfs_wait_buftarg()), so all we really need is a count of + * in-flight buffers. + * + * Buffers that are never released (e.g., superblock, iclog buffers) must set + * the XBF_NO_IOACCT flag before I/O submission. Otherwise, the buftarg count + * never reaches zero and unmount hangs indefinitely. + */ +static inline void +xfs_buf_ioacct_inc( +	struct xfs_buf	*bp) +{ +	if (bp->b_flags & (XBF_NO_IOACCT|_XBF_IN_FLIGHT)) +		return; + +	ASSERT(bp->b_flags & XBF_ASYNC); +	bp->b_flags |= _XBF_IN_FLIGHT; +	percpu_counter_inc(&bp->b_target->bt_io_count); +} + +/* + * Clear the in-flight state on a buffer about to be released to the LRU or + * freed and unaccount from the buftarg. + */ +static inline void +xfs_buf_ioacct_dec( +	struct xfs_buf	*bp) +{ +	if (!(bp->b_flags & _XBF_IN_FLIGHT)) +		return; + +	ASSERT(bp->b_flags & XBF_ASYNC); +	bp->b_flags &= ~_XBF_IN_FLIGHT; +	percpu_counter_dec(&bp->b_target->bt_io_count); +} + +/*   * When we mark a buffer stale, we remove the buffer from the LRU and clear the   * b_lru_ref count so that the buffer is freed immediately when the buffer   * reference count falls to zero. If the buffer is already on the LRU, we need @@ -102,6 +143,14 @@ xfs_buf_stale(  	 */  	bp->b_flags &= ~_XBF_DELWRI_Q; +	/* +	 * Once the buffer is marked stale and unlocked, a subsequent lookup +	 * could reset b_flags. There is no guarantee that the buffer is +	 * unaccounted (released to LRU) before that occurs. Drop in-flight +	 * status now to preserve accounting consistency. +	 */ +	xfs_buf_ioacct_dec(bp); +  	spin_lock(&bp->b_lock);  	atomic_set(&bp->b_lru_ref, 0);  	if (!(bp->b_state & XFS_BSTATE_DISPOSE) && @@ -815,7 +864,8 @@ xfs_buf_get_uncached(  	struct xfs_buf		*bp;  	DEFINE_SINGLE_BUF_MAP(map, XFS_BUF_DADDR_NULL, numblks); -	bp = _xfs_buf_alloc(target, &map, 1, 0); +	/* flags might contain irrelevant bits, pass only what we care about */ +	bp = _xfs_buf_alloc(target, &map, 1, flags & XBF_NO_IOACCT);  	if (unlikely(bp == NULL))  		goto fail; @@ -866,63 +916,85 @@ xfs_buf_hold(  }  /* - *	Releases a hold on the specified buffer.  If the - *	the hold count is 1, calls xfs_buf_free. + * Release a hold on the specified buffer. If the hold count is 1, the buffer is + * placed on LRU or freed (depending on b_lru_ref).   */  void  xfs_buf_rele(  	xfs_buf_t		*bp)  {  	struct xfs_perag	*pag = bp->b_pag; +	bool			release; +	bool			freebuf = false;  	trace_xfs_buf_rele(bp, _RET_IP_);  	if (!pag) {  		ASSERT(list_empty(&bp->b_lru));  		ASSERT(RB_EMPTY_NODE(&bp->b_rbnode)); -		if (atomic_dec_and_test(&bp->b_hold)) +		if (atomic_dec_and_test(&bp->b_hold)) { +			xfs_buf_ioacct_dec(bp);  			xfs_buf_free(bp); +		}  		return;  	}  	ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode));  	ASSERT(atomic_read(&bp->b_hold) > 0); -	if (atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock)) { -		spin_lock(&bp->b_lock); -		if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) { -			/* -			 * If the buffer is added to the LRU take a new -			 * reference to the buffer for the LRU and clear the -			 * (now stale) dispose list state flag -			 */ -			if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) { -				bp->b_state &= ~XFS_BSTATE_DISPOSE; -				atomic_inc(&bp->b_hold); -			} -			spin_unlock(&bp->b_lock); -			spin_unlock(&pag->pag_buf_lock); -		} else { -			/* -			 * most of the time buffers will already be removed from -			 * the LRU, so optimise that case by checking for the -			 * XFS_BSTATE_DISPOSE flag indicating the last list the -			 * buffer was on was the disposal list -			 */ -			if (!(bp->b_state & XFS_BSTATE_DISPOSE)) { -				list_lru_del(&bp->b_target->bt_lru, &bp->b_lru); -			} else { -				ASSERT(list_empty(&bp->b_lru)); -			} -			spin_unlock(&bp->b_lock); -			ASSERT(!(bp->b_flags & _XBF_DELWRI_Q)); -			rb_erase(&bp->b_rbnode, &pag->pag_buf_tree); -			spin_unlock(&pag->pag_buf_lock); -			xfs_perag_put(pag); -			xfs_buf_free(bp); +	release = atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock); +	spin_lock(&bp->b_lock); +	if (!release) { +		/* +		 * Drop the in-flight state if the buffer is already on the LRU +		 * and it holds the only reference. This is racy because we +		 * haven't acquired the pag lock, but the use of _XBF_IN_FLIGHT +		 * ensures the decrement occurs only once per-buf. +		 */ +		if ((atomic_read(&bp->b_hold) == 1) && !list_empty(&bp->b_lru)) +			xfs_buf_ioacct_dec(bp); +		goto out_unlock; +	} + +	/* the last reference has been dropped ... */ +	xfs_buf_ioacct_dec(bp); +	if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) { +		/* +		 * If the buffer is added to the LRU take a new reference to the +		 * buffer for the LRU and clear the (now stale) dispose list +		 * state flag +		 */ +		if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) { +			bp->b_state &= ~XFS_BSTATE_DISPOSE; +			atomic_inc(&bp->b_hold); +		} +		spin_unlock(&pag->pag_buf_lock); +	} else { +		/* +		 * most of the time buffers will already be removed from the +		 * LRU, so optimise that case by checking for the +		 * XFS_BSTATE_DISPOSE flag indicating the last list the buffer +		 * was on was the disposal list +		 */ +		if (!(bp->b_state & XFS_BSTATE_DISPOSE)) { +			list_lru_del(&bp->b_target->bt_lru, &bp->b_lru); +		} else { +			ASSERT(list_empty(&bp->b_lru));  		} + +		ASSERT(!(bp->b_flags & _XBF_DELWRI_Q)); +		rb_erase(&bp->b_rbnode, &pag->pag_buf_tree); +		spin_unlock(&pag->pag_buf_lock); +		xfs_perag_put(pag); +		freebuf = true;  	} + +out_unlock: +	spin_unlock(&bp->b_lock); + +	if (freebuf) +		xfs_buf_free(bp);  } @@ -944,10 +1016,12 @@ xfs_buf_trylock(  	int			locked;  	locked = down_trylock(&bp->b_sema) == 0; -	if (locked) +	if (locked) {  		XB_SET_OWNER(bp); - -	trace_xfs_buf_trylock(bp, _RET_IP_); +		trace_xfs_buf_trylock(bp, _RET_IP_); +	} else { +		trace_xfs_buf_trylock_fail(bp, _RET_IP_); +	}  	return locked;  } @@ -1127,7 +1201,8 @@ xfs_buf_ioapply_map(  	int		map,  	int		*buf_offset,  	int		*count, -	int		rw) +	int		op, +	int		op_flags)  {  	int		page_index;  	int		total_nr_pages = bp->b_page_count; @@ -1157,16 +1232,14 @@ xfs_buf_ioapply_map(  next_chunk:  	atomic_inc(&bp->b_io_remaining); -	nr_pages = BIO_MAX_SECTORS >> (PAGE_SHIFT - BBSHIFT); -	if (nr_pages > total_nr_pages) -		nr_pages = total_nr_pages; +	nr_pages = min(total_nr_pages, BIO_MAX_PAGES);  	bio = bio_alloc(GFP_NOIO, nr_pages);  	bio->bi_bdev = bp->b_target->bt_bdev;  	bio->bi_iter.bi_sector = sector;  	bio->bi_end_io = xfs_buf_bio_end_io;  	bio->bi_private = bp; - +	bio_set_op_attrs(bio, op, op_flags);  	for (; size && nr_pages; nr_pages--, page_index++) {  		int	rbytes, nbytes = PAGE_SIZE - offset; @@ -1190,7 +1263,7 @@ next_chunk:  			flush_kernel_vmap_range(bp->b_addr,  						xfs_buf_vmap_len(bp));  		} -		submit_bio(rw, bio); +		submit_bio(bio);  		if (size)  			goto next_chunk;  	} else { @@ -1210,7 +1283,8 @@ _xfs_buf_ioapply(  	struct xfs_buf	*bp)  {  	struct blk_plug	plug; -	int		rw; +	int		op; +	int		op_flags = 0;  	int		offset;  	int		size;  	int		i; @@ -1229,14 +1303,13 @@ _xfs_buf_ioapply(  		bp->b_ioend_wq = bp->b_target->bt_mount->m_buf_workqueue;  	if (bp->b_flags & XBF_WRITE) { +		op = REQ_OP_WRITE;  		if (bp->b_flags & XBF_SYNCIO) -			rw = WRITE_SYNC; -		else -			rw = WRITE; +			op_flags = WRITE_SYNC;  		if (bp->b_flags & XBF_FUA) -			rw |= REQ_FUA; +			op_flags |= REQ_FUA;  		if (bp->b_flags & XBF_FLUSH) -			rw |= REQ_FLUSH; +			op_flags |= REQ_PREFLUSH;  		/*  		 * Run the write verifier callback function if it exists. If @@ -1266,13 +1339,14 @@ _xfs_buf_ioapply(  			}  		}  	} else if (bp->b_flags & XBF_READ_AHEAD) { -		rw = READA; +		op = REQ_OP_READ; +		op_flags = REQ_RAHEAD;  	} else { -		rw = READ; +		op = REQ_OP_READ;  	}  	/* we only use the buffer cache for meta-data */ -	rw |= REQ_META; +	op_flags |= REQ_META;  	/*  	 * Walk all the vectors issuing IO on them. Set up the initial offset @@ -1284,7 +1358,7 @@ _xfs_buf_ioapply(  	size = BBTOB(bp->b_io_length);  	blk_start_plug(&plug);  	for (i = 0; i < bp->b_map_count; i++) { -		xfs_buf_ioapply_map(bp, i, &offset, &size, rw); +		xfs_buf_ioapply_map(bp, i, &offset, &size, op, op_flags);  		if (bp->b_error)  			break;  		if (size <= 0) @@ -1339,6 +1413,7 @@ xfs_buf_submit(  	 * xfs_buf_ioend too early.  	 */  	atomic_set(&bp->b_io_remaining, 1); +	xfs_buf_ioacct_inc(bp);  	_xfs_buf_ioapply(bp);  	/* @@ -1524,13 +1599,19 @@ xfs_wait_buftarg(  	int loop = 0;  	/* -	 * We need to flush the buffer workqueue to ensure that all IO -	 * completion processing is 100% done. Just waiting on buffer locks is -	 * not sufficient for async IO as the reference count held over IO is -	 * not released until after the buffer lock is dropped. Hence we need to -	 * ensure here that all reference counts have been dropped before we -	 * start walking the LRU list. +	 * First wait on the buftarg I/O count for all in-flight buffers to be +	 * released. This is critical as new buffers do not make the LRU until +	 * they are released. +	 * +	 * Next, flush the buffer workqueue to ensure all completion processing +	 * has finished. Just waiting on buffer locks is not sufficient for +	 * async IO as the reference count held over IO is not released until +	 * after the buffer lock is dropped. Hence we need to ensure here that +	 * all reference counts have been dropped before we start walking the +	 * LRU list.  	 */ +	while (percpu_counter_sum(&btp->bt_io_count)) +		delay(100);  	drain_workqueue(btp->bt_mount->m_buf_workqueue);  	/* loop until there is nothing left on the lru list. */ @@ -1627,6 +1708,8 @@ xfs_free_buftarg(  	struct xfs_buftarg	*btp)  {  	unregister_shrinker(&btp->bt_shrinker); +	ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0); +	percpu_counter_destroy(&btp->bt_io_count);  	list_lru_destroy(&btp->bt_lru);  	if (mp->m_flags & XFS_MOUNT_BARRIER) @@ -1691,6 +1774,9 @@ xfs_alloc_buftarg(  	if (list_lru_init(&btp->bt_lru))  		goto error; +	if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL)) +		goto error; +  	btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count;  	btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan;  	btp->bt_shrinker.seeks = DEFAULT_SEEKS; @@ -1774,18 +1860,33 @@ xfs_buf_cmp(  	return 0;  } +/* + * submit buffers for write. + * + * When we have a large buffer list, we do not want to hold all the buffers + * locked while we block on the request queue waiting for IO dispatch. To avoid + * this problem, we lock and submit buffers in groups of 50, thereby minimising + * the lock hold times for lists which may contain thousands of objects. + * + * To do this, we sort the buffer list before we walk the list to lock and + * submit buffers, and we plug and unplug around each group of buffers we + * submit. + */  static int -__xfs_buf_delwri_submit( +xfs_buf_delwri_submit_buffers(  	struct list_head	*buffer_list, -	struct list_head	*io_list, -	bool			wait) +	struct list_head	*wait_list)  { -	struct blk_plug		plug;  	struct xfs_buf		*bp, *n; +	LIST_HEAD		(submit_list);  	int			pinned = 0; +	struct blk_plug		plug; +	list_sort(NULL, buffer_list, xfs_buf_cmp); + +	blk_start_plug(&plug);  	list_for_each_entry_safe(bp, n, buffer_list, b_list) { -		if (!wait) { +		if (!wait_list) {  			if (xfs_buf_ispinned(bp)) {  				pinned++;  				continue; @@ -1808,25 +1909,21 @@ __xfs_buf_delwri_submit(  			continue;  		} -		list_move_tail(&bp->b_list, io_list);  		trace_xfs_buf_delwri_split(bp, _RET_IP_); -	} - -	list_sort(NULL, io_list, xfs_buf_cmp); - -	blk_start_plug(&plug); -	list_for_each_entry_safe(bp, n, io_list, b_list) { -		bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_ASYNC | XBF_WRITE_FAIL); -		bp->b_flags |= XBF_WRITE | XBF_ASYNC;  		/* -		 * we do all Io submission async. This means if we need to wait -		 * for IO completion we need to take an extra reference so the -		 * buffer is still valid on the other side. +		 * We do all IO submission async. This means if we need +		 * to wait for IO completion we need to take an extra +		 * reference so the buffer is still valid on the other +		 * side. We need to move the buffer onto the io_list +		 * at this point so the caller can still access it.  		 */ -		if (wait) +		bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_WRITE_FAIL); +		bp->b_flags |= XBF_WRITE | XBF_ASYNC; +		if (wait_list) {  			xfs_buf_hold(bp); -		else +			list_move_tail(&bp->b_list, wait_list); +		} else  			list_del_init(&bp->b_list);  		xfs_buf_submit(bp); @@ -1849,8 +1946,7 @@ int  xfs_buf_delwri_submit_nowait(  	struct list_head	*buffer_list)  { -	LIST_HEAD		(io_list); -	return __xfs_buf_delwri_submit(buffer_list, &io_list, false); +	return xfs_buf_delwri_submit_buffers(buffer_list, NULL);  }  /* @@ -1865,15 +1961,15 @@ int  xfs_buf_delwri_submit(  	struct list_head	*buffer_list)  { -	LIST_HEAD		(io_list); +	LIST_HEAD		(wait_list);  	int			error = 0, error2;  	struct xfs_buf		*bp; -	__xfs_buf_delwri_submit(buffer_list, &io_list, true); +	xfs_buf_delwri_submit_buffers(buffer_list, &wait_list);  	/* Wait for IO to complete. */ -	while (!list_empty(&io_list)) { -		bp = list_first_entry(&io_list, struct xfs_buf, b_list); +	while (!list_empty(&wait_list)) { +		bp = list_first_entry(&wait_list, struct xfs_buf, b_list);  		list_del_init(&bp->b_list);  |