diff options
Diffstat (limited to 'kernel/padata.c')
| -rw-r--r-- | kernel/padata.c | 307 | 
1 files changed, 130 insertions, 177 deletions
diff --git a/kernel/padata.c b/kernel/padata.c index 15a8ad63f4ff..c3fec1413295 100644 --- a/kernel/padata.c +++ b/kernel/padata.c @@ -46,18 +46,13 @@ static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)  	return target_cpu;  } -static int padata_cpu_hash(struct parallel_data *pd) +static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr)  { -	unsigned int seq_nr; -	int cpu_index; -  	/*  	 * Hash the sequence numbers to the cpus by taking  	 * seq_nr mod. number of cpus in use.  	 */ - -	seq_nr = atomic_inc_return(&pd->seq_nr); -	cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu); +	int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);  	return padata_index_to_cpu(pd, cpu_index);  } @@ -94,17 +89,19 @@ static void padata_parallel_worker(struct work_struct *parallel_work)   *   * @pinst: padata instance   * @padata: object to be parallelized - * @cb_cpu: cpu the serialization callback function will run on, - *          must be in the serial cpumask of padata(i.e. cpumask.cbcpu). + * @cb_cpu: pointer to the CPU that the serialization callback function should + *          run on.  If it's not in the serial cpumask of @pinst + *          (i.e. cpumask.cbcpu), this function selects a fallback CPU and if + *          none found, returns -EINVAL.   *   * The parallelization callback function will run with BHs off.   * Note: Every object which is parallelized by padata_do_parallel   * must be seen by padata_do_serial.   */  int padata_do_parallel(struct padata_instance *pinst, -		       struct padata_priv *padata, int cb_cpu) +		       struct padata_priv *padata, int *cb_cpu)  { -	int target_cpu, err; +	int i, cpu, cpu_index, target_cpu, err;  	struct padata_parallel_queue *queue;  	struct parallel_data *pd; @@ -116,8 +113,19 @@ int padata_do_parallel(struct padata_instance *pinst,  	if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)  		goto out; -	if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu)) -		goto out; +	if (!cpumask_test_cpu(*cb_cpu, pd->cpumask.cbcpu)) { +		if (!cpumask_weight(pd->cpumask.cbcpu)) +			goto out; + +		/* Select an alternate fallback CPU and notify the caller. */ +		cpu_index = *cb_cpu % cpumask_weight(pd->cpumask.cbcpu); + +		cpu = cpumask_first(pd->cpumask.cbcpu); +		for (i = 0; i < cpu_index; i++) +			cpu = cpumask_next(cpu, pd->cpumask.cbcpu); + +		*cb_cpu = cpu; +	}  	err =  -EBUSY;  	if ((pinst->flags & PADATA_RESET)) @@ -129,9 +137,10 @@ int padata_do_parallel(struct padata_instance *pinst,  	err = 0;  	atomic_inc(&pd->refcnt);  	padata->pd = pd; -	padata->cb_cpu = cb_cpu; +	padata->cb_cpu = *cb_cpu; -	target_cpu = padata_cpu_hash(pd); +	padata->seq_nr = atomic_inc_return(&pd->seq_nr); +	target_cpu = padata_cpu_hash(pd, padata->seq_nr);  	padata->cpu = target_cpu;  	queue = per_cpu_ptr(pd->pqueue, target_cpu); @@ -139,7 +148,7 @@ int padata_do_parallel(struct padata_instance *pinst,  	list_add_tail(&padata->list, &queue->parallel.list);  	spin_unlock(&queue->parallel.lock); -	queue_work_on(target_cpu, pinst->wq, &queue->work); +	queue_work(pinst->parallel_wq, &queue->work);  out:  	rcu_read_unlock_bh(); @@ -149,63 +158,53 @@ out:  EXPORT_SYMBOL(padata_do_parallel);  /* - * padata_get_next - Get the next object that needs serialization. + * padata_find_next - Find the next object that needs serialization.   *   * Return values are:   *   * A pointer to the control struct of the next object that needs   * serialization, if present in one of the percpu reorder queues.   * - * -EINPROGRESS, if the next object that needs serialization will + * NULL, if the next object that needs serialization will   *  be parallel processed by another cpu and is not yet present in   *  the cpu's reorder queue. - * - * -ENODATA, if this cpu has to do the parallel processing for - *  the next object.   */ -static struct padata_priv *padata_get_next(struct parallel_data *pd) +static struct padata_priv *padata_find_next(struct parallel_data *pd, +					    bool remove_object)  { -	int cpu, num_cpus; -	unsigned int next_nr, next_index;  	struct padata_parallel_queue *next_queue;  	struct padata_priv *padata;  	struct padata_list *reorder; +	int cpu = pd->cpu; -	num_cpus = cpumask_weight(pd->cpumask.pcpu); - -	/* -	 * Calculate the percpu reorder queue and the sequence -	 * number of the next object. -	 */ -	next_nr = pd->processed; -	next_index = next_nr % num_cpus; -	cpu = padata_index_to_cpu(pd, next_index);  	next_queue = per_cpu_ptr(pd->pqueue, cpu); -  	reorder = &next_queue->reorder;  	spin_lock(&reorder->lock); -	if (!list_empty(&reorder->list)) { -		padata = list_entry(reorder->list.next, -				    struct padata_priv, list); - -		list_del_init(&padata->list); -		atomic_dec(&pd->reorder_objects); +	if (list_empty(&reorder->list)) { +		spin_unlock(&reorder->lock); +		return NULL; +	} -		pd->processed++; +	padata = list_entry(reorder->list.next, struct padata_priv, list); +	/* +	 * Checks the rare case where two or more parallel jobs have hashed to +	 * the same CPU and one of the later ones finishes first. +	 */ +	if (padata->seq_nr != pd->processed) {  		spin_unlock(&reorder->lock); -		goto out; +		return NULL;  	} -	spin_unlock(&reorder->lock); -	if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) { -		padata = ERR_PTR(-ENODATA); -		goto out; +	if (remove_object) { +		list_del_init(&padata->list); +		atomic_dec(&pd->reorder_objects); +		++pd->processed; +		pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1, false);  	} -	padata = ERR_PTR(-EINPROGRESS); -out: +	spin_unlock(&reorder->lock);  	return padata;  } @@ -215,6 +214,7 @@ static void padata_reorder(struct parallel_data *pd)  	struct padata_priv *padata;  	struct padata_serial_queue *squeue;  	struct padata_instance *pinst = pd->pinst; +	struct padata_parallel_queue *next_queue;  	/*  	 * We need to ensure that only one cpu can work on dequeueing of @@ -230,27 +230,16 @@ static void padata_reorder(struct parallel_data *pd)  		return;  	while (1) { -		padata = padata_get_next(pd); +		padata = padata_find_next(pd, true);  		/*  		 * If the next object that needs serialization is parallel  		 * processed by another cpu and is still on it's way to the  		 * cpu's reorder queue, nothing to do for now.  		 */ -		if (PTR_ERR(padata) == -EINPROGRESS) +		if (!padata)  			break; -		/* -		 * This cpu has to do the parallel processing of the next -		 * object. It's waiting in the cpu's parallelization queue, -		 * so exit immediately. -		 */ -		if (PTR_ERR(padata) == -ENODATA) { -			del_timer(&pd->timer); -			spin_unlock_bh(&pd->lock); -			return; -		} -  		cb_cpu = padata->cb_cpu;  		squeue = per_cpu_ptr(pd->squeue, cb_cpu); @@ -258,77 +247,37 @@ static void padata_reorder(struct parallel_data *pd)  		list_add_tail(&padata->list, &squeue->serial.list);  		spin_unlock(&squeue->serial.lock); -		queue_work_on(cb_cpu, pinst->wq, &squeue->work); +		queue_work_on(cb_cpu, pinst->serial_wq, &squeue->work);  	}  	spin_unlock_bh(&pd->lock);  	/*  	 * The next object that needs serialization might have arrived to -	 * the reorder queues in the meantime, we will be called again -	 * from the timer function if no one else cares for it. +	 * the reorder queues in the meantime.  	 * -	 * Ensure reorder_objects is read after pd->lock is dropped so we see -	 * an increment from another task in padata_do_serial.  Pairs with +	 * Ensure reorder queue is read after pd->lock is dropped so we see +	 * new objects from another task in padata_do_serial.  Pairs with  	 * smp_mb__after_atomic in padata_do_serial.  	 */  	smp_mb(); -	if (atomic_read(&pd->reorder_objects) -			&& !(pinst->flags & PADATA_RESET)) -		mod_timer(&pd->timer, jiffies + HZ); -	else -		del_timer(&pd->timer); -	return; +	next_queue = per_cpu_ptr(pd->pqueue, pd->cpu); +	if (!list_empty(&next_queue->reorder.list) && +	    padata_find_next(pd, false)) +		queue_work(pinst->serial_wq, &pd->reorder_work);  }  static void invoke_padata_reorder(struct work_struct *work)  { -	struct padata_parallel_queue *pqueue;  	struct parallel_data *pd;  	local_bh_disable(); -	pqueue = container_of(work, struct padata_parallel_queue, reorder_work); -	pd = pqueue->pd; +	pd = container_of(work, struct parallel_data, reorder_work);  	padata_reorder(pd);  	local_bh_enable();  } -static void padata_reorder_timer(struct timer_list *t) -{ -	struct parallel_data *pd = from_timer(pd, t, timer); -	unsigned int weight; -	int target_cpu, cpu; - -	cpu = get_cpu(); - -	/* We don't lock pd here to not interfere with parallel processing -	 * padata_reorder() calls on other CPUs. We just need any CPU out of -	 * the cpumask.pcpu set. It would be nice if it's the right one but -	 * it doesn't matter if we're off to the next one by using an outdated -	 * pd->processed value. -	 */ -	weight = cpumask_weight(pd->cpumask.pcpu); -	target_cpu = padata_index_to_cpu(pd, pd->processed % weight); - -	/* ensure to call the reorder callback on the correct CPU */ -	if (cpu != target_cpu) { -		struct padata_parallel_queue *pqueue; -		struct padata_instance *pinst; - -		/* The timer function is serialized wrt itself -- no locking -		 * needed. -		 */ -		pinst = pd->pinst; -		pqueue = per_cpu_ptr(pd->pqueue, target_cpu); -		queue_work_on(target_cpu, pinst->wq, &pqueue->reorder_work); -	} else { -		padata_reorder(pd); -	} - -	put_cpu(); -} -  static void padata_serial_worker(struct work_struct *serial_work)  {  	struct padata_serial_queue *squeue; @@ -367,47 +316,28 @@ static void padata_serial_worker(struct work_struct *serial_work)   */  void padata_do_serial(struct padata_priv *padata)  { -	int cpu; -	struct padata_parallel_queue *pqueue; -	struct parallel_data *pd; -	int reorder_via_wq = 0; - -	pd = padata->pd; - -	cpu = get_cpu(); - -	/* We need to run on the same CPU padata_do_parallel(.., padata, ..) -	 * was called on -- or, at least, enqueue the padata object into the -	 * correct per-cpu queue. -	 */ -	if (cpu != padata->cpu) { -		reorder_via_wq = 1; -		cpu = padata->cpu; -	} - -	pqueue = per_cpu_ptr(pd->pqueue, cpu); +	struct parallel_data *pd = padata->pd; +	struct padata_parallel_queue *pqueue = per_cpu_ptr(pd->pqueue, +							   padata->cpu); +	struct padata_priv *cur;  	spin_lock(&pqueue->reorder.lock); +	/* Sort in ascending order of sequence number. */ +	list_for_each_entry_reverse(cur, &pqueue->reorder.list, list) +		if (cur->seq_nr < padata->seq_nr) +			break; +	list_add(&padata->list, &cur->list);  	atomic_inc(&pd->reorder_objects); -	list_add_tail(&padata->list, &pqueue->reorder.list);  	spin_unlock(&pqueue->reorder.lock);  	/* -	 * Ensure the atomic_inc of reorder_objects above is ordered correctly +	 * Ensure the addition to the reorder list is ordered correctly  	 * with the trylock of pd->lock in padata_reorder.  Pairs with smp_mb  	 * in padata_reorder.  	 */  	smp_mb__after_atomic(); -	put_cpu(); - -	/* If we're running on the wrong CPU, call padata_reorder() via a -	 * kernel worker. -	 */ -	if (reorder_via_wq) -		queue_work_on(cpu, pd->pinst->wq, &pqueue->reorder_work); -	else -		padata_reorder(pd); +	padata_reorder(pd);  }  EXPORT_SYMBOL(padata_do_serial); @@ -415,17 +345,36 @@ static int padata_setup_cpumasks(struct parallel_data *pd,  				 const struct cpumask *pcpumask,  				 const struct cpumask *cbcpumask)  { -	if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL)) -		return -ENOMEM; +	struct workqueue_attrs *attrs; +	int err = -ENOMEM; +	if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL)) +		goto out;  	cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask); -	if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) { -		free_cpumask_var(pd->cpumask.pcpu); -		return -ENOMEM; -	} +	if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) +		goto free_pcpu_mask;  	cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask); + +	attrs = alloc_workqueue_attrs(); +	if (!attrs) +		goto free_cbcpu_mask; + +	/* Restrict parallel_wq workers to pd->cpumask.pcpu. */ +	cpumask_copy(attrs->cpumask, pd->cpumask.pcpu); +	err = apply_workqueue_attrs(pd->pinst->parallel_wq, attrs); +	free_workqueue_attrs(attrs); +	if (err < 0) +		goto free_cbcpu_mask; +  	return 0; + +free_cbcpu_mask: +	free_cpumask_var(pd->cpumask.cbcpu); +free_pcpu_mask: +	free_cpumask_var(pd->cpumask.pcpu); +out: +	return err;  }  static void __padata_list_init(struct padata_list *pd_list) @@ -451,26 +400,15 @@ static void padata_init_squeues(struct parallel_data *pd)  /* Initialize all percpu queues used by parallel workers */  static void padata_init_pqueues(struct parallel_data *pd)  { -	int cpu_index, cpu; +	int cpu;  	struct padata_parallel_queue *pqueue; -	cpu_index = 0; -	for_each_possible_cpu(cpu) { +	for_each_cpu(cpu, pd->cpumask.pcpu) {  		pqueue = per_cpu_ptr(pd->pqueue, cpu); -		if (!cpumask_test_cpu(cpu, pd->cpumask.pcpu)) { -			pqueue->cpu_index = -1; -			continue; -		} - -		pqueue->pd = pd; -		pqueue->cpu_index = cpu_index; -		cpu_index++; -  		__padata_list_init(&pqueue->reorder);  		__padata_list_init(&pqueue->parallel);  		INIT_WORK(&pqueue->work, padata_parallel_worker); -		INIT_WORK(&pqueue->reorder_work, invoke_padata_reorder);  		atomic_set(&pqueue->num_obj, 0);  	}  } @@ -493,17 +431,19 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,  	pd->squeue = alloc_percpu(struct padata_serial_queue);  	if (!pd->squeue)  		goto err_free_pqueue; + +	pd->pinst = pinst;  	if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)  		goto err_free_squeue;  	padata_init_pqueues(pd);  	padata_init_squeues(pd); -	timer_setup(&pd->timer, padata_reorder_timer, 0);  	atomic_set(&pd->seq_nr, -1);  	atomic_set(&pd->reorder_objects, 0);  	atomic_set(&pd->refcnt, 0); -	pd->pinst = pinst;  	spin_lock_init(&pd->lock); +	pd->cpu = cpumask_first(pd->cpumask.pcpu); +	INIT_WORK(&pd->reorder_work, invoke_padata_reorder);  	return pd; @@ -538,8 +478,6 @@ static void padata_flush_queues(struct parallel_data *pd)  		flush_work(&pqueue->work);  	} -	del_timer_sync(&pd->timer); -  	if (atomic_read(&pd->reorder_objects))  		padata_reorder(pd); @@ -883,6 +821,8 @@ static void __padata_free(struct padata_instance *pinst)  	padata_free_pd(pinst->pd);  	free_cpumask_var(pinst->cpumask.pcpu);  	free_cpumask_var(pinst->cpumask.cbcpu); +	destroy_workqueue(pinst->serial_wq); +	destroy_workqueue(pinst->parallel_wq);  	kfree(pinst);  } @@ -1016,13 +956,11 @@ static struct kobj_type padata_attr_type = {   * padata_alloc - allocate and initialize a padata instance and specify   *                cpumasks for serial and parallel workers.   * - * @wq: workqueue to use for the allocated padata instance + * @name: used to identify the instance   * @pcpumask: cpumask that will be used for padata parallelization   * @cbcpumask: cpumask that will be used for padata serialization - * - * Must be called from a cpus_read_lock() protected region   */ -static struct padata_instance *padata_alloc(struct workqueue_struct *wq, +static struct padata_instance *padata_alloc(const char *name,  					    const struct cpumask *pcpumask,  					    const struct cpumask *cbcpumask)  { @@ -1033,11 +971,23 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq,  	if (!pinst)  		goto err; -	if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL)) +	pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0, +					     name); +	if (!pinst->parallel_wq)  		goto err_free_inst; + +	get_online_cpus(); + +	pinst->serial_wq = alloc_workqueue("%s_serial", WQ_MEM_RECLAIM | +					   WQ_CPU_INTENSIVE, 1, name); +	if (!pinst->serial_wq) +		goto err_put_cpus; + +	if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL)) +		goto err_free_serial_wq;  	if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {  		free_cpumask_var(pinst->cpumask.pcpu); -		goto err_free_inst; +		goto err_free_serial_wq;  	}  	if (!padata_validate_cpumask(pinst, pcpumask) ||  	    !padata_validate_cpumask(pinst, cbcpumask)) @@ -1049,8 +999,6 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq,  	rcu_assign_pointer(pinst->pd, pd); -	pinst->wq = wq; -  	cpumask_copy(pinst->cpumask.pcpu, pcpumask);  	cpumask_copy(pinst->cpumask.cbcpu, cbcpumask); @@ -1063,11 +1011,19 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq,  #ifdef CONFIG_HOTPLUG_CPU  	cpuhp_state_add_instance_nocalls_cpuslocked(hp_online, &pinst->node);  #endif + +	put_online_cpus(); +  	return pinst;  err_free_masks:  	free_cpumask_var(pinst->cpumask.pcpu);  	free_cpumask_var(pinst->cpumask.cbcpu); +err_free_serial_wq: +	destroy_workqueue(pinst->serial_wq); +err_put_cpus: +	put_online_cpus(); +	destroy_workqueue(pinst->parallel_wq);  err_free_inst:  	kfree(pinst);  err: @@ -1079,14 +1035,11 @@ err:   *                         Use the cpu_possible_mask for serial and   *                         parallel workers.   * - * @wq: workqueue to use for the allocated padata instance - * - * Must be called from a cpus_read_lock() protected region + * @name: used to identify the instance   */ -struct padata_instance *padata_alloc_possible(struct workqueue_struct *wq) +struct padata_instance *padata_alloc_possible(const char *name)  { -	lockdep_assert_cpus_held(); -	return padata_alloc(wq, cpu_possible_mask, cpu_possible_mask); +	return padata_alloc(name, cpu_possible_mask, cpu_possible_mask);  }  EXPORT_SYMBOL(padata_alloc_possible);  |