diff options
Diffstat (limited to 'kernel/rcu/tree_plugin.h')
| -rw-r--r-- | kernel/rcu/tree_plugin.h | 1195 | 
1 files changed, 712 insertions, 483 deletions
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h index acb225023ed1..2defc7fe74c3 100644 --- a/kernel/rcu/tree_plugin.h +++ b/kernel/rcu/tree_plugin.h @@ -288,7 +288,6 @@ void rcu_note_context_switch(bool preempt)  	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);  	struct rcu_node *rnp; -	barrier(); /* Avoid RCU read-side critical sections leaking down. */  	trace_rcu_utilization(TPS("Start context switch"));  	lockdep_assert_irqs_disabled();  	WARN_ON_ONCE(!preempt && t->rcu_read_lock_nesting > 0); @@ -314,15 +313,6 @@ void rcu_note_context_switch(bool preempt)  				       ? rnp->gp_seq  				       : rcu_seq_snap(&rnp->gp_seq));  		rcu_preempt_ctxt_queue(rnp, rdp); -	} else if (t->rcu_read_lock_nesting < 0 && -		   t->rcu_read_unlock_special.s) { - -		/* -		 * Complete exit from RCU read-side critical section on -		 * behalf of preempted instance of __rcu_read_unlock(). -		 */ -		rcu_read_unlock_special(t); -		rcu_preempt_deferred_qs(t);  	} else {  		rcu_preempt_deferred_qs(t);  	} @@ -340,7 +330,6 @@ void rcu_note_context_switch(bool preempt)  	if (rdp->exp_deferred_qs)  		rcu_report_exp_rdp(rdp);  	trace_rcu_utilization(TPS("End context switch")); -	barrier(); /* Avoid RCU read-side critical sections leaking up. */  }  EXPORT_SYMBOL_GPL(rcu_note_context_switch); @@ -626,22 +615,18 @@ static void rcu_read_unlock_special(struct task_struct *t)  		      (rdp->grpmask & rnp->expmask) ||  		      tick_nohz_full_cpu(rdp->cpu);  		// Need to defer quiescent state until everything is enabled. -		if ((exp || in_irq()) && irqs_were_disabled && use_softirq && -		    (in_irq() || !t->rcu_read_unlock_special.b.deferred_qs)) { +		if (irqs_were_disabled && use_softirq && +		    (in_interrupt() || +		     (exp && !t->rcu_read_unlock_special.b.deferred_qs))) {  			// Using softirq, safe to awaken, and we get  			// no help from enabling irqs, unlike bh/preempt.  			raise_softirq_irqoff(RCU_SOFTIRQ); -		} else if (exp && irqs_were_disabled && !use_softirq && -			   !t->rcu_read_unlock_special.b.deferred_qs) { -			// Safe to awaken and we get no help from enabling -			// irqs, unlike bh/preempt. -			invoke_rcu_core();  		} else {  			// Enabling BH or preempt does reschedule, so...  			// Also if no expediting or NO_HZ_FULL, slow is OK.  			set_tsk_need_resched(current);  			set_preempt_need_resched(); -			if (IS_ENABLED(CONFIG_IRQ_WORK) && +			if (IS_ENABLED(CONFIG_IRQ_WORK) && irqs_were_disabled &&  			    !rdp->defer_qs_iw_pending && exp) {  				// Get scheduler to re-evaluate and call hooks.  				// If !IRQ_WORK, FQS scan will eventually IPI. @@ -828,11 +813,6 @@ static void rcu_qs(void)   * dyntick-idle quiescent state visible to other CPUs, which will in   * some cases serve for expedited as well as normal grace periods.   * Either way, register a lightweight quiescent state. - * - * The barrier() calls are redundant in the common case when this is - * called externally, but just in case this is called from within this - * file. - *   */  void rcu_all_qs(void)  { @@ -847,14 +827,12 @@ void rcu_all_qs(void)  		return;  	}  	this_cpu_write(rcu_data.rcu_urgent_qs, false); -	barrier(); /* Avoid RCU read-side critical sections leaking down. */  	if (unlikely(raw_cpu_read(rcu_data.rcu_need_heavy_qs))) {  		local_irq_save(flags);  		rcu_momentary_dyntick_idle();  		local_irq_restore(flags);  	}  	rcu_qs(); -	barrier(); /* Avoid RCU read-side critical sections leaking up. */  	preempt_enable();  }  EXPORT_SYMBOL_GPL(rcu_all_qs); @@ -864,7 +842,6 @@ EXPORT_SYMBOL_GPL(rcu_all_qs);   */  void rcu_note_context_switch(bool preempt)  { -	barrier(); /* Avoid RCU read-side critical sections leaking down. */  	trace_rcu_utilization(TPS("Start context switch"));  	rcu_qs();  	/* Load rcu_urgent_qs before other flags. */ @@ -877,7 +854,6 @@ void rcu_note_context_switch(bool preempt)  		rcu_tasks_qs(current);  out:  	trace_rcu_utilization(TPS("End context switch")); -	barrier(); /* Avoid RCU read-side critical sections leaking up. */  }  EXPORT_SYMBOL_GPL(rcu_note_context_switch); @@ -1134,7 +1110,7 @@ static void rcu_preempt_boost_start_gp(struct rcu_node *rnp)   * already exist.  We only create this kthread for preemptible RCU.   * Returns zero if all is well, a negated errno otherwise.   */ -static int rcu_spawn_one_boost_kthread(struct rcu_node *rnp) +static void rcu_spawn_one_boost_kthread(struct rcu_node *rnp)  {  	int rnp_index = rnp - rcu_get_root();  	unsigned long flags; @@ -1142,25 +1118,27 @@ static int rcu_spawn_one_boost_kthread(struct rcu_node *rnp)  	struct task_struct *t;  	if (!IS_ENABLED(CONFIG_PREEMPT_RCU)) -		return 0; +		return;  	if (!rcu_scheduler_fully_active || rcu_rnp_online_cpus(rnp) == 0) -		return 0; +		return;  	rcu_state.boost = 1; +  	if (rnp->boost_kthread_task != NULL) -		return 0; +		return; +  	t = kthread_create(rcu_boost_kthread, (void *)rnp,  			   "rcub/%d", rnp_index); -	if (IS_ERR(t)) -		return PTR_ERR(t); +	if (WARN_ON_ONCE(IS_ERR(t))) +		return; +  	raw_spin_lock_irqsave_rcu_node(rnp, flags);  	rnp->boost_kthread_task = t;  	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);  	sp.sched_priority = kthread_prio;  	sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);  	wake_up_process(t); /* get to TASK_INTERRUPTIBLE quickly. */ -	return 0;  }  /* @@ -1201,7 +1179,7 @@ static void __init rcu_spawn_boost_kthreads(void)  	struct rcu_node *rnp;  	rcu_for_each_leaf_node(rnp) -		(void)rcu_spawn_one_boost_kthread(rnp); +		rcu_spawn_one_boost_kthread(rnp);  }  static void rcu_prepare_kthreads(int cpu) @@ -1211,7 +1189,7 @@ static void rcu_prepare_kthreads(int cpu)  	/* Fire up the incoming CPU's kthread and leaf rcu_node kthread. */  	if (rcu_scheduler_fully_active) -		(void)rcu_spawn_one_boost_kthread(rnp); +		rcu_spawn_one_boost_kthread(rnp);  }  #else /* #ifdef CONFIG_RCU_BOOST */ @@ -1248,10 +1226,10 @@ static void rcu_prepare_kthreads(int cpu)  #if !defined(CONFIG_RCU_FAST_NO_HZ)  /* - * Check to see if any future RCU-related work will need to be done - * by the current CPU, even if none need be done immediately, returning - * 1 if so.  This function is part of the RCU implementation; it is -not- - * an exported member of the RCU API. + * Check to see if any future non-offloaded RCU-related work will need + * to be done by the current CPU, even if none need be done immediately, + * returning 1 if so.  This function is part of the RCU implementation; + * it is -not- an exported member of the RCU API.   *   * Because we not have RCU_FAST_NO_HZ, just check whether or not this   * CPU has RCU callbacks queued. @@ -1259,7 +1237,8 @@ static void rcu_prepare_kthreads(int cpu)  int rcu_needs_cpu(u64 basemono, u64 *nextevt)  {  	*nextevt = KTIME_MAX; -	return !rcu_segcblist_empty(&this_cpu_ptr(&rcu_data)->cblist); +	return !rcu_segcblist_empty(&this_cpu_ptr(&rcu_data)->cblist) && +	       !rcu_segcblist_is_offloaded(&this_cpu_ptr(&rcu_data)->cblist);  }  /* @@ -1360,8 +1339,9 @@ int rcu_needs_cpu(u64 basemono, u64 *nextevt)  	lockdep_assert_irqs_disabled(); -	/* If no callbacks, RCU doesn't need the CPU. */ -	if (rcu_segcblist_empty(&rdp->cblist)) { +	/* If no non-offloaded callbacks, RCU doesn't need the CPU. */ +	if (rcu_segcblist_empty(&rdp->cblist) || +	    rcu_segcblist_is_offloaded(&this_cpu_ptr(&rcu_data)->cblist)) {  		*nextevt = KTIME_MAX;  		return 0;  	} @@ -1404,7 +1384,7 @@ static void rcu_prepare_for_idle(void)  	int tne;  	lockdep_assert_irqs_disabled(); -	if (rcu_is_nocb_cpu(smp_processor_id())) +	if (rcu_segcblist_is_offloaded(&rdp->cblist))  		return;  	/* Handle nohz enablement switches conservatively. */ @@ -1453,8 +1433,10 @@ static void rcu_prepare_for_idle(void)   */  static void rcu_cleanup_after_idle(void)  { +	struct rcu_data *rdp = this_cpu_ptr(&rcu_data); +  	lockdep_assert_irqs_disabled(); -	if (rcu_is_nocb_cpu(smp_processor_id())) +	if (rcu_segcblist_is_offloaded(&rdp->cblist))  		return;  	if (rcu_try_advance_all_cbs())  		invoke_rcu_core(); @@ -1469,10 +1451,10 @@ static void rcu_cleanup_after_idle(void)   * specified by rcu_nocb_mask.  For the CPUs in the set, there are kthreads   * created that pull the callbacks from the corresponding CPU, wait for   * a grace period to elapse, and invoke the callbacks.  These kthreads - * are organized into leaders, which manage incoming callbacks, wait for - * grace periods, and awaken followers, and the followers, which only - * invoke callbacks.  Each leader is its own follower.  The no-CBs CPUs - * do a wake_up() on their kthread when they insert a callback into any + * are organized into GP kthreads, which manage incoming callbacks, wait for + * grace periods, and awaken CB kthreads, and the CB kthreads, which only + * invoke callbacks.  Each GP kthread invokes its own CBs.  The no-CBs CPUs + * do a wake_up() on their GP kthread when they insert a callback into any   * empty list, unless the rcu_nocb_poll boot parameter has been specified,   * in which case each kthread actively polls its CPU.  (Which isn't so great   * for energy efficiency, but which does reduce RCU's overhead on that CPU.) @@ -1515,6 +1497,116 @@ static int __init parse_rcu_nocb_poll(char *arg)  early_param("rcu_nocb_poll", parse_rcu_nocb_poll);  /* + * Don't bother bypassing ->cblist if the call_rcu() rate is low. + * After all, the main point of bypassing is to avoid lock contention + * on ->nocb_lock, which only can happen at high call_rcu() rates. + */ +int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ; +module_param(nocb_nobypass_lim_per_jiffy, int, 0); + +/* + * Acquire the specified rcu_data structure's ->nocb_bypass_lock.  If the + * lock isn't immediately available, increment ->nocb_lock_contended to + * flag the contention. + */ +static void rcu_nocb_bypass_lock(struct rcu_data *rdp) +{ +	lockdep_assert_irqs_disabled(); +	if (raw_spin_trylock(&rdp->nocb_bypass_lock)) +		return; +	atomic_inc(&rdp->nocb_lock_contended); +	WARN_ON_ONCE(smp_processor_id() != rdp->cpu); +	smp_mb__after_atomic(); /* atomic_inc() before lock. */ +	raw_spin_lock(&rdp->nocb_bypass_lock); +	smp_mb__before_atomic(); /* atomic_dec() after lock. */ +	atomic_dec(&rdp->nocb_lock_contended); +} + +/* + * Spinwait until the specified rcu_data structure's ->nocb_lock is + * not contended.  Please note that this is extremely special-purpose, + * relying on the fact that at most two kthreads and one CPU contend for + * this lock, and also that the two kthreads are guaranteed to have frequent + * grace-period-duration time intervals between successive acquisitions + * of the lock.  This allows us to use an extremely simple throttling + * mechanism, and further to apply it only to the CPU doing floods of + * call_rcu() invocations.  Don't try this at home! + */ +static void rcu_nocb_wait_contended(struct rcu_data *rdp) +{ +	WARN_ON_ONCE(smp_processor_id() != rdp->cpu); +	while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended))) +		cpu_relax(); +} + +/* + * Conditionally acquire the specified rcu_data structure's + * ->nocb_bypass_lock. + */ +static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp) +{ +	lockdep_assert_irqs_disabled(); +	return raw_spin_trylock(&rdp->nocb_bypass_lock); +} + +/* + * Release the specified rcu_data structure's ->nocb_bypass_lock. + */ +static void rcu_nocb_bypass_unlock(struct rcu_data *rdp) +{ +	lockdep_assert_irqs_disabled(); +	raw_spin_unlock(&rdp->nocb_bypass_lock); +} + +/* + * Acquire the specified rcu_data structure's ->nocb_lock, but only + * if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_lock(struct rcu_data *rdp) +{ +	lockdep_assert_irqs_disabled(); +	if (!rcu_segcblist_is_offloaded(&rdp->cblist)) +		return; +	raw_spin_lock(&rdp->nocb_lock); +} + +/* + * Release the specified rcu_data structure's ->nocb_lock, but only + * if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_unlock(struct rcu_data *rdp) +{ +	if (rcu_segcblist_is_offloaded(&rdp->cblist)) { +		lockdep_assert_irqs_disabled(); +		raw_spin_unlock(&rdp->nocb_lock); +	} +} + +/* + * Release the specified rcu_data structure's ->nocb_lock and restore + * interrupts, but only if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, +				       unsigned long flags) +{ +	if (rcu_segcblist_is_offloaded(&rdp->cblist)) { +		lockdep_assert_irqs_disabled(); +		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); +	} else { +		local_irq_restore(flags); +	} +} + +/* Lockdep check that ->cblist may be safely accessed. */ +static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp) +{ +	lockdep_assert_irqs_disabled(); +	if (rcu_segcblist_is_offloaded(&rdp->cblist) && +	    cpu_online(rdp->cpu)) +		lockdep_assert_held(&rdp->nocb_lock); +} + +/*   * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended   * grace period.   */ @@ -1543,440 +1635,514 @@ bool rcu_is_nocb_cpu(int cpu)  }  /* - * Kick the leader kthread for this NOCB group.  Caller holds ->nocb_lock + * Kick the GP kthread for this NOCB group.  Caller holds ->nocb_lock   * and this function releases it.   */ -static void __wake_nocb_leader(struct rcu_data *rdp, bool force, -			       unsigned long flags) +static void wake_nocb_gp(struct rcu_data *rdp, bool force, +			   unsigned long flags)  	__releases(rdp->nocb_lock)  { -	struct rcu_data *rdp_leader = rdp->nocb_leader; +	bool needwake = false; +	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;  	lockdep_assert_held(&rdp->nocb_lock); -	if (!READ_ONCE(rdp_leader->nocb_kthread)) { -		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); +	if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) { +		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, +				    TPS("AlreadyAwake")); +		rcu_nocb_unlock_irqrestore(rdp, flags);  		return;  	} -	if (rdp_leader->nocb_leader_sleep || force) { -		/* Prior smp_mb__after_atomic() orders against prior enqueue. */ -		WRITE_ONCE(rdp_leader->nocb_leader_sleep, false); -		del_timer(&rdp->nocb_timer); -		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); -		smp_mb(); /* ->nocb_leader_sleep before swake_up_one(). */ -		swake_up_one(&rdp_leader->nocb_wq); -	} else { -		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); +	del_timer(&rdp->nocb_timer); +	rcu_nocb_unlock_irqrestore(rdp, flags); +	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); +	if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) { +		WRITE_ONCE(rdp_gp->nocb_gp_sleep, false); +		needwake = true; +		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));  	} +	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags); +	if (needwake) +		wake_up_process(rdp_gp->nocb_gp_kthread);  }  /* - * Kick the leader kthread for this NOCB group, but caller has not - * acquired locks. + * Arrange to wake the GP kthread for this NOCB group at some future + * time when it is safe to do so.   */ -static void wake_nocb_leader(struct rcu_data *rdp, bool force) +static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, +			       const char *reason)  { -	unsigned long flags; +	if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) +		mod_timer(&rdp->nocb_timer, jiffies + 1); +	if (rdp->nocb_defer_wakeup < waketype) +		WRITE_ONCE(rdp->nocb_defer_wakeup, waketype); +	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); +} + +/* + * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL. + * However, if there is a callback to be enqueued and if ->nocb_bypass + * proves to be initially empty, just return false because the no-CB GP + * kthread may need to be awakened in this case. + * + * Note that this function always returns true if rhp is NULL. + */ +static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, +				     unsigned long j) +{ +	struct rcu_cblist rcl; -	raw_spin_lock_irqsave(&rdp->nocb_lock, flags); -	__wake_nocb_leader(rdp, force, flags); +	WARN_ON_ONCE(!rcu_segcblist_is_offloaded(&rdp->cblist)); +	rcu_lockdep_assert_cblist_protected(rdp); +	lockdep_assert_held(&rdp->nocb_bypass_lock); +	if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) { +		raw_spin_unlock(&rdp->nocb_bypass_lock); +		return false; +	} +	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ +	if (rhp) +		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ +	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); +	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); +	WRITE_ONCE(rdp->nocb_bypass_first, j); +	rcu_nocb_bypass_unlock(rdp); +	return true;  }  /* - * Arrange to wake the leader kthread for this NOCB group at some - * future time when it is safe to do so. + * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL. + * However, if there is a callback to be enqueued and if ->nocb_bypass + * proves to be initially empty, just return false because the no-CB GP + * kthread may need to be awakened in this case. + * + * Note that this function always returns true if rhp is NULL.   */ -static void wake_nocb_leader_defer(struct rcu_data *rdp, int waketype, -				   const char *reason) +static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, +				  unsigned long j)  { -	unsigned long flags; +	if (!rcu_segcblist_is_offloaded(&rdp->cblist)) +		return true; +	rcu_lockdep_assert_cblist_protected(rdp); +	rcu_nocb_bypass_lock(rdp); +	return rcu_nocb_do_flush_bypass(rdp, rhp, j); +} -	raw_spin_lock_irqsave(&rdp->nocb_lock, flags); -	if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) -		mod_timer(&rdp->nocb_timer, jiffies + 1); -	WRITE_ONCE(rdp->nocb_defer_wakeup, waketype); -	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); -	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); +/* + * If the ->nocb_bypass_lock is immediately available, flush the + * ->nocb_bypass queue into ->cblist. + */ +static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) +{ +	rcu_lockdep_assert_cblist_protected(rdp); +	if (!rcu_segcblist_is_offloaded(&rdp->cblist) || +	    !rcu_nocb_bypass_trylock(rdp)) +		return; +	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));  } -/* Does rcu_barrier need to queue an RCU callback on the specified CPU?  */ -static bool rcu_nocb_cpu_needs_barrier(int cpu) +/* + * See whether it is appropriate to use the ->nocb_bypass list in order + * to control contention on ->nocb_lock.  A limited number of direct + * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass + * is non-empty, further callbacks must be placed into ->nocb_bypass, + * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch + * back to direct use of ->cblist.  However, ->nocb_bypass should not be + * used if ->cblist is empty, because otherwise callbacks can be stranded + * on ->nocb_bypass because we cannot count on the current CPU ever again + * invoking call_rcu().  The general rule is that if ->nocb_bypass is + * non-empty, the corresponding no-CBs grace-period kthread must not be + * in an indefinite sleep state. + * + * Finally, it is not permitted to use the bypass during early boot, + * as doing so would confuse the auto-initialization code.  Besides + * which, there is no point in worrying about lock contention while + * there is only one CPU in operation. + */ +static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, +				bool *was_alldone, unsigned long flags)  { -	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); -	unsigned long ret; -#ifdef CONFIG_PROVE_RCU -	struct rcu_head *rhp; -#endif /* #ifdef CONFIG_PROVE_RCU */ +	unsigned long c; +	unsigned long cur_gp_seq; +	unsigned long j = jiffies; +	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); -	/* -	 * Check count of all no-CBs callbacks awaiting invocation. -	 * There needs to be a barrier before this function is called, -	 * but associated with a prior determination that no more -	 * callbacks would be posted.  In the worst case, the first -	 * barrier in rcu_barrier() suffices (but the caller cannot -	 * necessarily rely on this, not a substitute for the caller -	 * getting the concurrency design right!).  There must also be a -	 * barrier between the following load and posting of a callback -	 * (if a callback is in fact needed).  This is associated with an -	 * atomic_inc() in the caller. -	 */ -	ret = rcu_get_n_cbs_nocb_cpu(rdp); - -#ifdef CONFIG_PROVE_RCU -	rhp = READ_ONCE(rdp->nocb_head); -	if (!rhp) -		rhp = READ_ONCE(rdp->nocb_gp_head); -	if (!rhp) -		rhp = READ_ONCE(rdp->nocb_follower_head); - -	/* Having no rcuo kthread but CBs after scheduler starts is bad! */ -	if (!READ_ONCE(rdp->nocb_kthread) && rhp && -	    rcu_scheduler_fully_active) { -		/* RCU callback enqueued before CPU first came online??? */ -		pr_err("RCU: Never-onlined no-CBs CPU %d has CB %p\n", -		       cpu, rhp->func); -		WARN_ON_ONCE(1); +	if (!rcu_segcblist_is_offloaded(&rdp->cblist)) { +		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); +		return false; /* Not offloaded, no bypassing. */ +	} +	lockdep_assert_irqs_disabled(); + +	// Don't use ->nocb_bypass during early boot. +	if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) { +		rcu_nocb_lock(rdp); +		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); +		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); +		return false; +	} + +	// If we have advanced to a new jiffy, reset counts to allow +	// moving back from ->nocb_bypass to ->cblist. +	if (j == rdp->nocb_nobypass_last) { +		c = rdp->nocb_nobypass_count + 1; +	} else { +		WRITE_ONCE(rdp->nocb_nobypass_last, j); +		c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy; +		if (ULONG_CMP_LT(rdp->nocb_nobypass_count, +				 nocb_nobypass_lim_per_jiffy)) +			c = 0; +		else if (c > nocb_nobypass_lim_per_jiffy) +			c = nocb_nobypass_lim_per_jiffy; +	} +	WRITE_ONCE(rdp->nocb_nobypass_count, c); + +	// If there hasn't yet been all that many ->cblist enqueues +	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush +	// ->nocb_bypass first. +	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { +		rcu_nocb_lock(rdp); +		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); +		if (*was_alldone) +			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, +					    TPS("FirstQ")); +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); +		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); +		return false; // Caller must enqueue the callback. +	} + +	// If ->nocb_bypass has been used too long or is too full, +	// flush ->nocb_bypass to ->cblist. +	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || +	    ncbs >= qhimark) { +		rcu_nocb_lock(rdp); +		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { +			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); +			if (*was_alldone) +				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, +						    TPS("FirstQ")); +			WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); +			return false; // Caller must enqueue the callback. +		} +		if (j != rdp->nocb_gp_adv_time && +		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && +		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) { +			rcu_advance_cbs_nowake(rdp->mynode, rdp); +			rdp->nocb_gp_adv_time = j; +		} +		rcu_nocb_unlock_irqrestore(rdp, flags); +		return true; // Callback already enqueued.  	} -#endif /* #ifdef CONFIG_PROVE_RCU */ -	return !!ret; +	// We need to use the bypass. +	rcu_nocb_wait_contended(rdp); +	rcu_nocb_bypass_lock(rdp); +	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); +	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ +	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); +	if (!ncbs) { +		WRITE_ONCE(rdp->nocb_bypass_first, j); +		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ")); +	} +	rcu_nocb_bypass_unlock(rdp); +	smp_mb(); /* Order enqueue before wake. */ +	if (ncbs) { +		local_irq_restore(flags); +	} else { +		// No-CBs GP kthread might be indefinitely asleep, if so, wake. +		rcu_nocb_lock(rdp); // Rare during call_rcu() flood. +		if (!rcu_segcblist_pend_cbs(&rdp->cblist)) { +			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, +					    TPS("FirstBQwake")); +			__call_rcu_nocb_wake(rdp, true, flags); +		} else { +			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, +					    TPS("FirstBQnoWake")); +			rcu_nocb_unlock_irqrestore(rdp, flags); +		} +	} +	return true; // Callback already enqueued.  }  /* - * Enqueue the specified string of rcu_head structures onto the specified - * CPU's no-CBs lists.  The CPU is specified by rdp, the head of the - * string by rhp, and the tail of the string by rhtp.  The non-lazy/lazy - * counts are supplied by rhcount and rhcount_lazy. + * Awaken the no-CBs grace-period kthead if needed, either due to it + * legitimately being asleep or due to overload conditions.   *   * If warranted, also wake up the kthread servicing this CPUs queues.   */ -static void __call_rcu_nocb_enqueue(struct rcu_data *rdp, -				    struct rcu_head *rhp, -				    struct rcu_head **rhtp, -				    int rhcount, int rhcount_lazy, -				    unsigned long flags) +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, +				 unsigned long flags) +				 __releases(rdp->nocb_lock)  { -	int len; -	struct rcu_head **old_rhpp; +	unsigned long cur_gp_seq; +	unsigned long j; +	long len;  	struct task_struct *t; -	/* Enqueue the callback on the nocb list and update counts. */ -	atomic_long_add(rhcount, &rdp->nocb_q_count); -	/* rcu_barrier() relies on ->nocb_q_count add before xchg. */ -	old_rhpp = xchg(&rdp->nocb_tail, rhtp); -	WRITE_ONCE(*old_rhpp, rhp); -	atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy); -	smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */ - -	/* If we are not being polled and there is a kthread, awaken it ... */ -	t = READ_ONCE(rdp->nocb_kthread); +	// If we are being polled or there is no kthread, just leave. +	t = READ_ONCE(rdp->nocb_gp_kthread);  	if (rcu_nocb_poll || !t) {  		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,  				    TPS("WakeNotPoll")); +		rcu_nocb_unlock_irqrestore(rdp, flags);  		return;  	} -	len = rcu_get_n_cbs_nocb_cpu(rdp); -	if (old_rhpp == &rdp->nocb_head) { +	// Need to actually to a wakeup. +	len = rcu_segcblist_n_cbs(&rdp->cblist); +	if (was_alldone) { +		rdp->qlen_last_fqs_check = len;  		if (!irqs_disabled_flags(flags)) {  			/* ... if queue was empty ... */ -			wake_nocb_leader(rdp, false); +			wake_nocb_gp(rdp, false, flags);  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,  					    TPS("WakeEmpty"));  		} else { -			wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE, -					       TPS("WakeEmptyIsDeferred")); +			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE, +					   TPS("WakeEmptyIsDeferred")); +			rcu_nocb_unlock_irqrestore(rdp, flags);  		} -		rdp->qlen_last_fqs_check = 0;  	} else if (len > rdp->qlen_last_fqs_check + qhimark) {  		/* ... or if many callbacks queued. */ -		if (!irqs_disabled_flags(flags)) { -			wake_nocb_leader(rdp, true); -			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, -					    TPS("WakeOvf")); -		} else { -			wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE_FORCE, -					       TPS("WakeOvfIsDeferred")); +		rdp->qlen_last_fqs_check = len; +		j = jiffies; +		if (j != rdp->nocb_gp_adv_time && +		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && +		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) { +			rcu_advance_cbs_nowake(rdp->mynode, rdp); +			rdp->nocb_gp_adv_time = j;  		} -		rdp->qlen_last_fqs_check = LONG_MAX / 2; +		smp_mb(); /* Enqueue before timer_pending(). */ +		if ((rdp->nocb_cb_sleep || +		     !rcu_segcblist_ready_cbs(&rdp->cblist)) && +		    !timer_pending(&rdp->nocb_bypass_timer)) +			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE, +					   TPS("WakeOvfIsDeferred")); +		rcu_nocb_unlock_irqrestore(rdp, flags);  	} else {  		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); +		rcu_nocb_unlock_irqrestore(rdp, flags);  	}  	return;  } -/* - * This is a helper for __call_rcu(), which invokes this when the normal - * callback queue is inoperable.  If this is not a no-CBs CPU, this - * function returns failure back to __call_rcu(), which can complain - * appropriately. - * - * Otherwise, this function queues the callback where the corresponding - * "rcuo" kthread can find it. - */ -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, -			    bool lazy, unsigned long flags) +/* Wake up the no-CBs GP kthread to flush ->nocb_bypass. */ +static void do_nocb_bypass_wakeup_timer(struct timer_list *t)  { +	unsigned long flags; +	struct rcu_data *rdp = from_timer(rdp, t, nocb_bypass_timer); -	if (!rcu_is_nocb_cpu(rdp->cpu)) -		return false; -	__call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags); -	if (__is_kfree_rcu_offset((unsigned long)rhp->func)) -		trace_rcu_kfree_callback(rcu_state.name, rhp, -					 (unsigned long)rhp->func, -					 -atomic_long_read(&rdp->nocb_q_count_lazy), -					 -rcu_get_n_cbs_nocb_cpu(rdp)); -	else -		trace_rcu_callback(rcu_state.name, rhp, -				   -atomic_long_read(&rdp->nocb_q_count_lazy), -				   -rcu_get_n_cbs_nocb_cpu(rdp)); - -	/* -	 * If called from an extended quiescent state with interrupts -	 * disabled, invoke the RCU core in order to allow the idle-entry -	 * deferred-wakeup check to function. -	 */ -	if (irqs_disabled_flags(flags) && -	    !rcu_is_watching() && -	    cpu_online(smp_processor_id())) -		invoke_rcu_core(); - -	return true; -} - -/* - * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is - * not a no-CBs CPU. - */ -static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, -						     struct rcu_data *rdp, -						     unsigned long flags) -{ -	lockdep_assert_irqs_disabled(); -	if (!rcu_is_nocb_cpu(smp_processor_id())) -		return false; /* Not NOCBs CPU, caller must migrate CBs. */ -	__call_rcu_nocb_enqueue(my_rdp, rcu_segcblist_head(&rdp->cblist), -				rcu_segcblist_tail(&rdp->cblist), -				rcu_segcblist_n_cbs(&rdp->cblist), -				rcu_segcblist_n_lazy_cbs(&rdp->cblist), flags); -	rcu_segcblist_init(&rdp->cblist); -	rcu_segcblist_disable(&rdp->cblist); -	return true; +	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer")); +	rcu_nocb_lock_irqsave(rdp, flags); +	smp_mb__after_spinlock(); /* Timer expire before wakeup. */ +	__call_rcu_nocb_wake(rdp, true, flags);  }  /* - * If necessary, kick off a new grace period, and either way wait - * for a subsequent grace period to complete. + * No-CBs GP kthreads come here to wait for additional callbacks to show up + * or for grace periods to end.   */ -static void rcu_nocb_wait_gp(struct rcu_data *rdp) +static void nocb_gp_wait(struct rcu_data *my_rdp)  { -	unsigned long c; -	bool d; +	bool bypass = false; +	long bypass_ncbs; +	int __maybe_unused cpu = my_rdp->cpu; +	unsigned long cur_gp_seq;  	unsigned long flags; +	bool gotcbs; +	unsigned long j = jiffies; +	bool needwait_gp = false; // This prevents actual uninitialized use.  	bool needwake; -	struct rcu_node *rnp = rdp->mynode; +	bool needwake_gp; +	struct rcu_data *rdp; +	struct rcu_node *rnp; +	unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning. -	local_irq_save(flags); -	c = rcu_seq_snap(&rcu_state.gp_seq); -	if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) { -		local_irq_restore(flags); -	} else { -		raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */ -		needwake = rcu_start_this_gp(rnp, rdp, c); -		raw_spin_unlock_irqrestore_rcu_node(rnp, flags); -		if (needwake) +	/* +	 * Each pass through the following loop checks for CBs and for the +	 * nearest grace period (if any) to wait for next.  The CB kthreads +	 * and the global grace-period kthread are awakened if needed. +	 */ +	for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) { +		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check")); +		rcu_nocb_lock_irqsave(rdp, flags); +		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); +		if (bypass_ncbs && +		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || +		     bypass_ncbs > 2 * qhimark)) { +			// Bypass full or old, so flush it. +			(void)rcu_nocb_try_flush_bypass(rdp, j); +			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); +		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { +			rcu_nocb_unlock_irqrestore(rdp, flags); +			continue; /* No callbacks here, try next. */ +		} +		if (bypass_ncbs) { +			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, +					    TPS("Bypass")); +			bypass = true; +		} +		rnp = rdp->mynode; +		if (bypass) {  // Avoid race with first bypass CB. +			WRITE_ONCE(my_rdp->nocb_defer_wakeup, +				   RCU_NOCB_WAKE_NOT); +			del_timer(&my_rdp->nocb_timer); +		} +		// Advance callbacks if helpful and low contention. +		needwake_gp = false; +		if (!rcu_segcblist_restempty(&rdp->cblist, +					     RCU_NEXT_READY_TAIL) || +		    (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && +		     rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) { +			raw_spin_lock_rcu_node(rnp); /* irqs disabled. */ +			needwake_gp = rcu_advance_cbs(rnp, rdp); +			raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */ +		} +		// Need to wait on some grace period? +		WARN_ON_ONCE(!rcu_segcblist_restempty(&rdp->cblist, +						      RCU_NEXT_READY_TAIL)); +		if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) { +			if (!needwait_gp || +			    ULONG_CMP_LT(cur_gp_seq, wait_gp_seq)) +				wait_gp_seq = cur_gp_seq; +			needwait_gp = true; +			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, +					    TPS("NeedWaitGP")); +		} +		if (rcu_segcblist_ready_cbs(&rdp->cblist)) { +			needwake = rdp->nocb_cb_sleep; +			WRITE_ONCE(rdp->nocb_cb_sleep, false); +			smp_mb(); /* CB invocation -after- GP end. */ +		} else { +			needwake = false; +		} +		rcu_nocb_unlock_irqrestore(rdp, flags); +		if (needwake) { +			swake_up_one(&rdp->nocb_cb_wq); +			gotcbs = true; +		} +		if (needwake_gp)  			rcu_gp_kthread_wake();  	} -	/* -	 * Wait for the grace period.  Do so interruptibly to avoid messing -	 * up the load average. -	 */ -	trace_rcu_this_gp(rnp, rdp, c, TPS("StartWait")); -	for (;;) { +	my_rdp->nocb_gp_bypass = bypass; +	my_rdp->nocb_gp_gp = needwait_gp; +	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0; +	if (bypass && !rcu_nocb_poll) { +		// At least one child with non-empty ->nocb_bypass, so set +		// timer in order to avoid stranding its callbacks. +		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); +		mod_timer(&my_rdp->nocb_bypass_timer, j + 2); +		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); +	} +	if (rcu_nocb_poll) { +		/* Polling, so trace if first poll in the series. */ +		if (gotcbs) +			trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll")); +		schedule_timeout_interruptible(1); +	} else if (!needwait_gp) { +		/* Wait for callbacks to appear. */ +		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep")); +		swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, +				!READ_ONCE(my_rdp->nocb_gp_sleep)); +		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep")); +	} else { +		rnp = my_rdp->mynode; +		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));  		swait_event_interruptible_exclusive( -			rnp->nocb_gp_wq[rcu_seq_ctr(c) & 0x1], -			(d = rcu_seq_done(&rnp->gp_seq, c))); -		if (likely(d)) -			break; -		WARN_ON(signal_pending(current)); -		trace_rcu_this_gp(rnp, rdp, c, TPS("ResumeWait")); +			rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1], +			rcu_seq_done(&rnp->gp_seq, wait_gp_seq) || +			!READ_ONCE(my_rdp->nocb_gp_sleep)); +		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));  	} -	trace_rcu_this_gp(rnp, rdp, c, TPS("EndWait")); -	smp_mb(); /* Ensure that CB invocation happens after GP end. */ +	if (!rcu_nocb_poll) { +		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); +		if (bypass) +			del_timer(&my_rdp->nocb_bypass_timer); +		WRITE_ONCE(my_rdp->nocb_gp_sleep, true); +		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); +	} +	my_rdp->nocb_gp_seq = -1; +	WARN_ON(signal_pending(current));  }  /* - * Leaders come here to wait for additional callbacks to show up. - * This function does not return until callbacks appear. + * No-CBs grace-period-wait kthread.  There is one of these per group + * of CPUs, but only once at least one CPU in that group has come online + * at least once since boot.  This kthread checks for newly posted + * callbacks from any of the CPUs it is responsible for, waits for a + * grace period, then awakens all of the rcu_nocb_cb_kthread() instances + * that then have callback-invocation work to do.   */ -static void nocb_leader_wait(struct rcu_data *my_rdp) +static int rcu_nocb_gp_kthread(void *arg)  { -	bool firsttime = true; -	unsigned long flags; -	bool gotcbs; -	struct rcu_data *rdp; -	struct rcu_head **tail; - -wait_again: - -	/* Wait for callbacks to appear. */ -	if (!rcu_nocb_poll) { -		trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Sleep")); -		swait_event_interruptible_exclusive(my_rdp->nocb_wq, -				!READ_ONCE(my_rdp->nocb_leader_sleep)); -		raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); -		my_rdp->nocb_leader_sleep = true; -		WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); -		del_timer(&my_rdp->nocb_timer); -		raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); -	} else if (firsttime) { -		firsttime = false; /* Don't drown trace log with "Poll"! */ -		trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Poll")); -	} - -	/* -	 * Each pass through the following loop checks a follower for CBs. -	 * We are our own first follower.  Any CBs found are moved to -	 * nocb_gp_head, where they await a grace period. -	 */ -	gotcbs = false; -	smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */ -	for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) { -		rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head); -		if (!rdp->nocb_gp_head) -			continue;  /* No CBs here, try next follower. */ - -		/* Move callbacks to wait-for-GP list, which is empty. */ -		WRITE_ONCE(rdp->nocb_head, NULL); -		rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head); -		gotcbs = true; -	} - -	/* No callbacks?  Sleep a bit if polling, and go retry.  */ -	if (unlikely(!gotcbs)) { -		WARN_ON(signal_pending(current)); -		if (rcu_nocb_poll) { -			schedule_timeout_interruptible(1); -		} else { -			trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, -					    TPS("WokeEmpty")); -		} -		goto wait_again; -	} +	struct rcu_data *rdp = arg; -	/* Wait for one grace period. */ -	rcu_nocb_wait_gp(my_rdp); - -	/* Each pass through the following loop wakes a follower, if needed. */ -	for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) { -		if (!rcu_nocb_poll && -		    READ_ONCE(rdp->nocb_head) && -		    READ_ONCE(my_rdp->nocb_leader_sleep)) { -			raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); -			my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/ -			raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); -		} -		if (!rdp->nocb_gp_head) -			continue; /* No CBs, so no need to wake follower. */ - -		/* Append callbacks to follower's "done" list. */ -		raw_spin_lock_irqsave(&rdp->nocb_lock, flags); -		tail = rdp->nocb_follower_tail; -		rdp->nocb_follower_tail = rdp->nocb_gp_tail; -		*tail = rdp->nocb_gp_head; -		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); -		if (rdp != my_rdp && tail == &rdp->nocb_follower_head) { -			/* List was empty, so wake up the follower.  */ -			swake_up_one(&rdp->nocb_wq); -		} +	for (;;) { +		WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1); +		nocb_gp_wait(rdp); +		cond_resched_tasks_rcu_qs();  	} - -	/* If we (the leader) don't have CBs, go wait some more. */ -	if (!my_rdp->nocb_follower_head) -		goto wait_again; +	return 0;  }  /* - * Followers come here to wait for additional callbacks to show up. - * This function does not return until callbacks appear. + * Invoke any ready callbacks from the corresponding no-CBs CPU, + * then, if there are no more, wait for more to appear.   */ -static void nocb_follower_wait(struct rcu_data *rdp) +static void nocb_cb_wait(struct rcu_data *rdp)  { -	for (;;) { -		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FollowerSleep")); -		swait_event_interruptible_exclusive(rdp->nocb_wq, -					 READ_ONCE(rdp->nocb_follower_head)); -		if (smp_load_acquire(&rdp->nocb_follower_head)) { -			/* ^^^ Ensure CB invocation follows _head test. */ -			return; -		} -		WARN_ON(signal_pending(current)); -		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); +	unsigned long cur_gp_seq; +	unsigned long flags; +	bool needwake_gp = false; +	struct rcu_node *rnp = rdp->mynode; + +	local_irq_save(flags); +	rcu_momentary_dyntick_idle(); +	local_irq_restore(flags); +	local_bh_disable(); +	rcu_do_batch(rdp); +	local_bh_enable(); +	lockdep_assert_irqs_enabled(); +	rcu_nocb_lock_irqsave(rdp, flags); +	if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && +	    rcu_seq_done(&rnp->gp_seq, cur_gp_seq) && +	    raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */ +		needwake_gp = rcu_advance_cbs(rdp->mynode, rdp); +		raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ +	} +	if (rcu_segcblist_ready_cbs(&rdp->cblist)) { +		rcu_nocb_unlock_irqrestore(rdp, flags); +		if (needwake_gp) +			rcu_gp_kthread_wake(); +		return; +	} + +	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep")); +	WRITE_ONCE(rdp->nocb_cb_sleep, true); +	rcu_nocb_unlock_irqrestore(rdp, flags); +	if (needwake_gp) +		rcu_gp_kthread_wake(); +	swait_event_interruptible_exclusive(rdp->nocb_cb_wq, +				 !READ_ONCE(rdp->nocb_cb_sleep)); +	if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */ +		/* ^^^ Ensure CB invocation follows _sleep test. */ +		return;  	} +	WARN_ON(signal_pending(current)); +	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));  }  /* - * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes - * callbacks queued by the corresponding no-CBs CPU, however, there is - * an optional leader-follower relationship so that the grace-period - * kthreads don't have to do quite so many wakeups. + * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke + * nocb_cb_wait() to do the dirty work.   */ -static int rcu_nocb_kthread(void *arg) +static int rcu_nocb_cb_kthread(void *arg)  { -	int c, cl; -	unsigned long flags; -	struct rcu_head *list; -	struct rcu_head *next; -	struct rcu_head **tail;  	struct rcu_data *rdp = arg; -	/* Each pass through this loop invokes one batch of callbacks */ +	// Each pass through this loop does one callback batch, and, +	// if there are no more ready callbacks, waits for them.  	for (;;) { -		/* Wait for callbacks. */ -		if (rdp->nocb_leader == rdp) -			nocb_leader_wait(rdp); -		else -			nocb_follower_wait(rdp); - -		/* Pull the ready-to-invoke callbacks onto local list. */ -		raw_spin_lock_irqsave(&rdp->nocb_lock, flags); -		list = rdp->nocb_follower_head; -		rdp->nocb_follower_head = NULL; -		tail = rdp->nocb_follower_tail; -		rdp->nocb_follower_tail = &rdp->nocb_follower_head; -		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); -		if (WARN_ON_ONCE(!list)) -			continue; -		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeNonEmpty")); - -		/* Each pass through the following loop invokes a callback. */ -		trace_rcu_batch_start(rcu_state.name, -				      atomic_long_read(&rdp->nocb_q_count_lazy), -				      rcu_get_n_cbs_nocb_cpu(rdp), -1); -		c = cl = 0; -		while (list) { -			next = list->next; -			/* Wait for enqueuing to complete, if needed. */ -			while (next == NULL && &list->next != tail) { -				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, -						    TPS("WaitQueue")); -				schedule_timeout_interruptible(1); -				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, -						    TPS("WokeQueue")); -				next = list->next; -			} -			debug_rcu_head_unqueue(list); -			local_bh_disable(); -			if (__rcu_reclaim(rcu_state.name, list)) -				cl++; -			c++; -			local_bh_enable(); -			cond_resched_tasks_rcu_qs(); -			list = next; -		} -		trace_rcu_batch_end(rcu_state.name, c, !!list, 0, 0, 1); -		smp_mb__before_atomic();  /* _add after CB invocation. */ -		atomic_long_add(-c, &rdp->nocb_q_count); -		atomic_long_add(-cl, &rdp->nocb_q_count_lazy); +		nocb_cb_wait(rdp); +		cond_resched_tasks_rcu_qs();  	}  	return 0;  } @@ -1993,14 +2159,14 @@ static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp)  	unsigned long flags;  	int ndw; -	raw_spin_lock_irqsave(&rdp->nocb_lock, flags); +	rcu_nocb_lock_irqsave(rdp, flags);  	if (!rcu_nocb_need_deferred_wakeup(rdp)) { -		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); +		rcu_nocb_unlock_irqrestore(rdp, flags);  		return;  	}  	ndw = READ_ONCE(rdp->nocb_defer_wakeup);  	WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); -	__wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); +	wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);  	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));  } @@ -2027,6 +2193,7 @@ void __init rcu_init_nohz(void)  {  	int cpu;  	bool need_rcu_nocb_mask = false; +	struct rcu_data *rdp;  #if defined(CONFIG_NO_HZ_FULL)  	if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask)) @@ -2060,67 +2227,63 @@ void __init rcu_init_nohz(void)  	if (rcu_nocb_poll)  		pr_info("\tPoll for callbacks from no-CBs CPUs.\n"); -	for_each_cpu(cpu, rcu_nocb_mask) -		init_nocb_callback_list(per_cpu_ptr(&rcu_data, cpu)); +	for_each_cpu(cpu, rcu_nocb_mask) { +		rdp = per_cpu_ptr(&rcu_data, cpu); +		if (rcu_segcblist_empty(&rdp->cblist)) +			rcu_segcblist_init(&rdp->cblist); +		rcu_segcblist_offload(&rdp->cblist); +	}  	rcu_organize_nocb_kthreads();  }  /* Initialize per-rcu_data variables for no-CBs CPUs. */  static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)  { -	rdp->nocb_tail = &rdp->nocb_head; -	init_swait_queue_head(&rdp->nocb_wq); -	rdp->nocb_follower_tail = &rdp->nocb_follower_head; +	init_swait_queue_head(&rdp->nocb_cb_wq); +	init_swait_queue_head(&rdp->nocb_gp_wq);  	raw_spin_lock_init(&rdp->nocb_lock); +	raw_spin_lock_init(&rdp->nocb_bypass_lock); +	raw_spin_lock_init(&rdp->nocb_gp_lock);  	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0); +	timer_setup(&rdp->nocb_bypass_timer, do_nocb_bypass_wakeup_timer, 0); +	rcu_cblist_init(&rdp->nocb_bypass);  }  /*   * If the specified CPU is a no-CBs CPU that does not already have its - * rcuo kthread, spawn it.  If the CPUs are brought online out of order, - * this can require re-organizing the leader-follower relationships. + * rcuo CB kthread, spawn it.  Additionally, if the rcuo GP kthread + * for this CPU's group has not yet been created, spawn it as well.   */  static void rcu_spawn_one_nocb_kthread(int cpu)  { -	struct rcu_data *rdp; -	struct rcu_data *rdp_last; -	struct rcu_data *rdp_old_leader; -	struct rcu_data *rdp_spawn = per_cpu_ptr(&rcu_data, cpu); +	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); +	struct rcu_data *rdp_gp;  	struct task_struct *t;  	/*  	 * If this isn't a no-CBs CPU or if it already has an rcuo kthread,  	 * then nothing to do.  	 */ -	if (!rcu_is_nocb_cpu(cpu) || rdp_spawn->nocb_kthread) +	if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread)  		return; -	/* If we didn't spawn the leader first, reorganize! */ -	rdp_old_leader = rdp_spawn->nocb_leader; -	if (rdp_old_leader != rdp_spawn && !rdp_old_leader->nocb_kthread) { -		rdp_last = NULL; -		rdp = rdp_old_leader; -		do { -			rdp->nocb_leader = rdp_spawn; -			if (rdp_last && rdp != rdp_spawn) -				rdp_last->nocb_next_follower = rdp; -			if (rdp == rdp_spawn) { -				rdp = rdp->nocb_next_follower; -			} else { -				rdp_last = rdp; -				rdp = rdp->nocb_next_follower; -				rdp_last->nocb_next_follower = NULL; -			} -		} while (rdp); -		rdp_spawn->nocb_next_follower = rdp_old_leader; +	/* If we didn't spawn the GP kthread first, reorganize! */ +	rdp_gp = rdp->nocb_gp_rdp; +	if (!rdp_gp->nocb_gp_kthread) { +		t = kthread_run(rcu_nocb_gp_kthread, rdp_gp, +				"rcuog/%d", rdp_gp->cpu); +		if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__)) +			return; +		WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);  	}  	/* Spawn the kthread for this CPU. */ -	t = kthread_run(rcu_nocb_kthread, rdp_spawn, +	t = kthread_run(rcu_nocb_cb_kthread, rdp,  			"rcuo%c/%d", rcu_state.abbr, cpu); -	if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo kthread, OOM is now expected behavior\n", __func__)) +	if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))  		return; -	WRITE_ONCE(rdp_spawn->nocb_kthread, t); +	WRITE_ONCE(rdp->nocb_cb_kthread, t); +	WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);  }  /* @@ -2147,27 +2310,28 @@ static void __init rcu_spawn_nocb_kthreads(void)  		rcu_spawn_cpu_nocb_kthread(cpu);  } -/* How many follower CPU IDs per leader?  Default of -1 for sqrt(nr_cpu_ids). */ -static int rcu_nocb_leader_stride = -1; -module_param(rcu_nocb_leader_stride, int, 0444); +/* How many CB CPU IDs per GP kthread?  Default of -1 for sqrt(nr_cpu_ids). */ +static int rcu_nocb_gp_stride = -1; +module_param(rcu_nocb_gp_stride, int, 0444);  /* - * Initialize leader-follower relationships for all no-CBs CPU. + * Initialize GP-CB relationships for all no-CBs CPU.   */  static void __init rcu_organize_nocb_kthreads(void)  {  	int cpu; -	int ls = rcu_nocb_leader_stride; -	int nl = 0;  /* Next leader. */ +	bool firsttime = true; +	int ls = rcu_nocb_gp_stride; +	int nl = 0;  /* Next GP kthread. */  	struct rcu_data *rdp; -	struct rcu_data *rdp_leader = NULL;  /* Suppress misguided gcc warn. */ +	struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */  	struct rcu_data *rdp_prev = NULL;  	if (!cpumask_available(rcu_nocb_mask))  		return;  	if (ls == -1) { -		ls = int_sqrt(nr_cpu_ids); -		rcu_nocb_leader_stride = ls; +		ls = nr_cpu_ids / int_sqrt(nr_cpu_ids); +		rcu_nocb_gp_stride = ls;  	}  	/* @@ -2178,39 +2342,24 @@ static void __init rcu_organize_nocb_kthreads(void)  	for_each_cpu(cpu, rcu_nocb_mask) {  		rdp = per_cpu_ptr(&rcu_data, cpu);  		if (rdp->cpu >= nl) { -			/* New leader, set up for followers & next leader. */ +			/* New GP kthread, set up for CBs & next GP. */  			nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls; -			rdp->nocb_leader = rdp; -			rdp_leader = rdp; +			rdp->nocb_gp_rdp = rdp; +			rdp_gp = rdp; +			if (!firsttime && dump_tree) +				pr_cont("\n"); +			firsttime = false; +			pr_alert("%s: No-CB GP kthread CPU %d:", __func__, cpu);  		} else { -			/* Another follower, link to previous leader. */ -			rdp->nocb_leader = rdp_leader; -			rdp_prev->nocb_next_follower = rdp; +			/* Another CB kthread, link to previous GP kthread. */ +			rdp->nocb_gp_rdp = rdp_gp; +			rdp_prev->nocb_next_cb_rdp = rdp; +			pr_alert(" %d", cpu);  		}  		rdp_prev = rdp;  	}  } -/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */ -static bool init_nocb_callback_list(struct rcu_data *rdp) -{ -	if (!rcu_is_nocb_cpu(rdp->cpu)) -		return false; - -	/* If there are early-boot callbacks, move them to nocb lists. */ -	if (!rcu_segcblist_empty(&rdp->cblist)) { -		rdp->nocb_head = rcu_segcblist_head(&rdp->cblist); -		rdp->nocb_tail = rcu_segcblist_tail(&rdp->cblist); -		atomic_long_set(&rdp->nocb_q_count, -				rcu_segcblist_n_cbs(&rdp->cblist)); -		atomic_long_set(&rdp->nocb_q_count_lazy, -				rcu_segcblist_n_lazy_cbs(&rdp->cblist)); -		rcu_segcblist_init(&rdp->cblist); -	} -	rcu_segcblist_disable(&rdp->cblist); -	return true; -} -  /*   * Bind the current task to the offloaded CPUs.  If there are no offloaded   * CPUs, leave the task unbound.  Splat if the bind attempt fails. @@ -2223,20 +2372,101 @@ void rcu_bind_current_to_nocb(void)  EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);  /* - * Return the number of RCU callbacks still queued from the specified - * CPU, which must be a nocbs CPU. + * Dump out nocb grace-period kthread state for the specified rcu_data + * structure.   */ -static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp) +static void show_rcu_nocb_gp_state(struct rcu_data *rdp)  { -	return atomic_long_read(&rdp->nocb_q_count); +	struct rcu_node *rnp = rdp->mynode; + +	pr_info("nocb GP %d %c%c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu\n", +		rdp->cpu, +		"kK"[!!rdp->nocb_gp_kthread], +		"lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)], +		"dD"[!!rdp->nocb_defer_wakeup], +		"tT"[timer_pending(&rdp->nocb_timer)], +		"bB"[timer_pending(&rdp->nocb_bypass_timer)], +		"sS"[!!rdp->nocb_gp_sleep], +		".W"[swait_active(&rdp->nocb_gp_wq)], +		".W"[swait_active(&rnp->nocb_gp_wq[0])], +		".W"[swait_active(&rnp->nocb_gp_wq[1])], +		".B"[!!rdp->nocb_gp_bypass], +		".G"[!!rdp->nocb_gp_gp], +		(long)rdp->nocb_gp_seq, +		rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops)); +} + +/* Dump out nocb kthread state for the specified rcu_data structure. */ +static void show_rcu_nocb_state(struct rcu_data *rdp) +{ +	struct rcu_segcblist *rsclp = &rdp->cblist; +	bool waslocked; +	bool wastimer; +	bool wassleep; + +	if (rdp->nocb_gp_rdp == rdp) +		show_rcu_nocb_gp_state(rdp); + +	pr_info("   CB %d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%c%c%c q%ld\n", +		rdp->cpu, rdp->nocb_gp_rdp->cpu, +		"kK"[!!rdp->nocb_cb_kthread], +		"bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)], +		"cC"[!!atomic_read(&rdp->nocb_lock_contended)], +		"lL"[raw_spin_is_locked(&rdp->nocb_lock)], +		"sS"[!!rdp->nocb_cb_sleep], +		".W"[swait_active(&rdp->nocb_cb_wq)], +		jiffies - rdp->nocb_bypass_first, +		jiffies - rdp->nocb_nobypass_last, +		rdp->nocb_nobypass_count, +		".D"[rcu_segcblist_ready_cbs(rsclp)], +		".W"[!rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL)], +		".R"[!rcu_segcblist_restempty(rsclp, RCU_WAIT_TAIL)], +		".N"[!rcu_segcblist_restempty(rsclp, RCU_NEXT_READY_TAIL)], +		".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)], +		rcu_segcblist_n_cbs(&rdp->cblist)); + +	/* It is OK for GP kthreads to have GP state. */ +	if (rdp->nocb_gp_rdp == rdp) +		return; + +	waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock); +	wastimer = timer_pending(&rdp->nocb_timer); +	wassleep = swait_active(&rdp->nocb_gp_wq); +	if (!rdp->nocb_defer_wakeup && !rdp->nocb_gp_sleep && +	    !waslocked && !wastimer && !wassleep) +		return;  /* Nothing untowards. */ + +	pr_info("   !!! %c%c%c%c %c\n", +		"lL"[waslocked], +		"dD"[!!rdp->nocb_defer_wakeup], +		"tT"[wastimer], +		"sS"[!!rdp->nocb_gp_sleep], +		".W"[wassleep]);  }  #else /* #ifdef CONFIG_RCU_NOCB_CPU */ -static bool rcu_nocb_cpu_needs_barrier(int cpu) +/* No ->nocb_lock to acquire.  */ +static void rcu_nocb_lock(struct rcu_data *rdp) +{ +} + +/* No ->nocb_lock to release.  */ +static void rcu_nocb_unlock(struct rcu_data *rdp)  { -	WARN_ON_ONCE(1); /* Should be dead code. */ -	return false; +} + +/* No ->nocb_lock to release.  */ +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, +				       unsigned long flags) +{ +	local_irq_restore(flags); +} + +/* Lockdep check that ->cblist may be safely accessed. */ +static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp) +{ +	lockdep_assert_irqs_disabled();  }  static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq) @@ -2252,19 +2482,24 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)  {  } -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, -			    bool lazy, unsigned long flags) +static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, +				  unsigned long j)  { -	return false; +	return true;  } -static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, -						     struct rcu_data *rdp, -						     unsigned long flags) +static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, +				bool *was_alldone, unsigned long flags)  {  	return false;  } +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, +				 unsigned long flags) +{ +	WARN_ON_ONCE(1);  /* Should be dead code! */ +} +  static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)  {  } @@ -2286,14 +2521,8 @@ static void __init rcu_spawn_nocb_kthreads(void)  {  } -static bool init_nocb_callback_list(struct rcu_data *rdp) -{ -	return false; -} - -static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp) +static void show_rcu_nocb_state(struct rcu_data *rdp)  { -	return 0;  }  #endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */  |