diff options
Diffstat (limited to 'lib/sbitmap.c')
| -rw-r--r-- | lib/sbitmap.c | 115 | 
1 files changed, 82 insertions, 33 deletions
diff --git a/lib/sbitmap.c b/lib/sbitmap.c index e6a9c06ec70c..fdd1b8aa8ac6 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -52,7 +52,7 @@ int sbitmap_init_node(struct sbitmap *sb, unsigned int depth, int shift,  		return 0;  	} -	sb->map = kzalloc_node(sb->map_nr * sizeof(*sb->map), flags, node); +	sb->map = kcalloc_node(sb->map_nr, sizeof(*sb->map), flags, node);  	if (!sb->map)  		return -ENOMEM; @@ -270,18 +270,33 @@ void sbitmap_bitmap_show(struct sbitmap *sb, struct seq_file *m)  }  EXPORT_SYMBOL_GPL(sbitmap_bitmap_show); -static unsigned int sbq_calc_wake_batch(unsigned int depth) +static unsigned int sbq_calc_wake_batch(struct sbitmap_queue *sbq, +					unsigned int depth)  {  	unsigned int wake_batch; +	unsigned int shallow_depth;  	/*  	 * For each batch, we wake up one queue. We need to make sure that our -	 * batch size is small enough that the full depth of the bitmap is -	 * enough to wake up all of the queues. +	 * batch size is small enough that the full depth of the bitmap, +	 * potentially limited by a shallow depth, is enough to wake up all of +	 * the queues. +	 * +	 * Each full word of the bitmap has bits_per_word bits, and there might +	 * be a partial word. There are depth / bits_per_word full words and +	 * depth % bits_per_word bits left over. In bitwise arithmetic: +	 * +	 * bits_per_word = 1 << shift +	 * depth / bits_per_word = depth >> shift +	 * depth % bits_per_word = depth & ((1 << shift) - 1) +	 * +	 * Each word can be limited to sbq->min_shallow_depth bits.  	 */ -	wake_batch = SBQ_WAKE_BATCH; -	if (wake_batch > depth / SBQ_WAIT_QUEUES) -		wake_batch = max(1U, depth / SBQ_WAIT_QUEUES); +	shallow_depth = min(1U << sbq->sb.shift, sbq->min_shallow_depth); +	depth = ((depth >> sbq->sb.shift) * shallow_depth + +		 min(depth & ((1U << sbq->sb.shift) - 1), shallow_depth)); +	wake_batch = clamp_t(unsigned int, depth / SBQ_WAIT_QUEUES, 1, +			     SBQ_WAKE_BATCH);  	return wake_batch;  } @@ -307,7 +322,8 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth,  			*per_cpu_ptr(sbq->alloc_hint, i) = prandom_u32() % depth;  	} -	sbq->wake_batch = sbq_calc_wake_batch(depth); +	sbq->min_shallow_depth = UINT_MAX; +	sbq->wake_batch = sbq_calc_wake_batch(sbq, depth);  	atomic_set(&sbq->wake_index, 0);  	sbq->ws = kzalloc_node(SBQ_WAIT_QUEUES * sizeof(*sbq->ws), flags, node); @@ -327,21 +343,28 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth,  }  EXPORT_SYMBOL_GPL(sbitmap_queue_init_node); -void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) +static void sbitmap_queue_update_wake_batch(struct sbitmap_queue *sbq, +					    unsigned int depth)  { -	unsigned int wake_batch = sbq_calc_wake_batch(depth); +	unsigned int wake_batch = sbq_calc_wake_batch(sbq, depth);  	int i;  	if (sbq->wake_batch != wake_batch) {  		WRITE_ONCE(sbq->wake_batch, wake_batch);  		/* -		 * Pairs with the memory barrier in sbq_wake_up() to ensure that -		 * the batch size is updated before the wait counts. +		 * Pairs with the memory barrier in sbitmap_queue_wake_up() +		 * to ensure that the batch size is updated before the wait +		 * counts.  		 */  		smp_mb__before_atomic();  		for (i = 0; i < SBQ_WAIT_QUEUES; i++)  			atomic_set(&sbq->ws[i].wait_cnt, 1);  	} +} + +void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) +{ +	sbitmap_queue_update_wake_batch(sbq, depth);  	sbitmap_resize(&sbq->sb, depth);  }  EXPORT_SYMBOL_GPL(sbitmap_queue_resize); @@ -380,6 +403,8 @@ int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq,  	unsigned int hint, depth;  	int nr; +	WARN_ON_ONCE(shallow_depth < sbq->min_shallow_depth); +  	hint = this_cpu_read(*sbq->alloc_hint);  	depth = READ_ONCE(sbq->sb.depth);  	if (unlikely(hint >= depth)) { @@ -403,6 +428,14 @@ int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq,  }  EXPORT_SYMBOL_GPL(__sbitmap_queue_get_shallow); +void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq, +				     unsigned int min_shallow_depth) +{ +	sbq->min_shallow_depth = min_shallow_depth; +	sbitmap_queue_update_wake_batch(sbq, sbq->sb.depth); +} +EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth); +  static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)  {  	int i, wake_index; @@ -425,52 +458,67 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)  	return NULL;  } -static void sbq_wake_up(struct sbitmap_queue *sbq) +static bool __sbq_wake_up(struct sbitmap_queue *sbq)  {  	struct sbq_wait_state *ws;  	unsigned int wake_batch;  	int wait_cnt; -	/* -	 * Pairs with the memory barrier in set_current_state() to ensure the -	 * proper ordering of clear_bit()/waitqueue_active() in the waker and -	 * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the -	 * waiter. See the comment on waitqueue_active(). This is __after_atomic -	 * because we just did clear_bit_unlock() in the caller. -	 */ -	smp_mb__after_atomic(); -  	ws = sbq_wake_ptr(sbq);  	if (!ws) -		return; +		return false;  	wait_cnt = atomic_dec_return(&ws->wait_cnt);  	if (wait_cnt <= 0) { +		int ret; +  		wake_batch = READ_ONCE(sbq->wake_batch); +  		/*  		 * Pairs with the memory barrier in sbitmap_queue_resize() to  		 * ensure that we see the batch size update before the wait  		 * count is reset.  		 */  		smp_mb__before_atomic(); +  		/* -		 * If there are concurrent callers to sbq_wake_up(), the last -		 * one to decrement the wait count below zero will bump it back -		 * up. If there is a concurrent resize, the count reset will -		 * either cause the cmpxchg to fail or overwrite after the -		 * cmpxchg. +		 * For concurrent callers of this, the one that failed the +		 * atomic_cmpxhcg() race should call this function again +		 * to wakeup a new batch on a different 'ws'.  		 */ -		atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch); -		sbq_index_atomic_inc(&sbq->wake_index); -		wake_up_nr(&ws->wait, wake_batch); +		ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); +		if (ret == wait_cnt) { +			sbq_index_atomic_inc(&sbq->wake_index); +			wake_up_nr(&ws->wait, wake_batch); +			return false; +		} + +		return true;  	} + +	return false; +} + +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq) +{ +	while (__sbq_wake_up(sbq)) +		;  } +EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up);  void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr,  			 unsigned int cpu)  {  	sbitmap_clear_bit_unlock(&sbq->sb, nr); -	sbq_wake_up(sbq); +	/* +	 * Pairs with the memory barrier in set_current_state() to ensure the +	 * proper ordering of clear_bit_unlock()/waitqueue_active() in the waker +	 * and test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the +	 * waiter. See the comment on waitqueue_active(). +	 */ +	smp_mb__after_atomic(); +	sbitmap_queue_wake_up(sbq); +  	if (likely(!sbq->round_robin && nr < sbq->sb.depth))  		*per_cpu_ptr(sbq->alloc_hint, cpu) = nr;  } @@ -482,7 +530,7 @@ void sbitmap_queue_wake_all(struct sbitmap_queue *sbq)  	/*  	 * Pairs with the memory barrier in set_current_state() like in -	 * sbq_wake_up(). +	 * sbitmap_queue_wake_up().  	 */  	smp_mb();  	wake_index = atomic_read(&sbq->wake_index); @@ -528,5 +576,6 @@ void sbitmap_queue_show(struct sbitmap_queue *sbq, struct seq_file *m)  	seq_puts(m, "}\n");  	seq_printf(m, "round_robin=%d\n", sbq->round_robin); +	seq_printf(m, "min_shallow_depth=%u\n", sbq->min_shallow_depth);  }  EXPORT_SYMBOL_GPL(sbitmap_queue_show);  |