diff options
Diffstat (limited to 'kernel/rcu/tree.c')
-rw-r--r-- | kernel/rcu/tree.c | 283 |
1 files changed, 180 insertions, 103 deletions
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index e641cc681901..a60616e69b66 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -79,9 +79,6 @@ static void rcu_sr_normal_gp_cleanup_work(struct work_struct *); static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, rcu_data) = { .gpwrap = true, -#ifdef CONFIG_RCU_NOCB_CPU - .cblist.flags = SEGCBLIST_RCU_CORE, -#endif }; static struct rcu_state rcu_state = { .level = { &rcu_state.node[0] }, @@ -97,6 +94,9 @@ static struct rcu_state rcu_state = { .srs_cleanup_work = __WORK_INITIALIZER(rcu_state.srs_cleanup_work, rcu_sr_normal_gp_cleanup_work), .srs_cleanups_pending = ATOMIC_INIT(0), +#ifdef CONFIG_RCU_NOCB_CPU + .nocb_mutex = __MUTEX_INITIALIZER(rcu_state.nocb_mutex), +#endif }; /* Dump rcu_node combining tree at boot to verify correct setup. */ @@ -283,37 +283,45 @@ void rcu_softirq_qs(void) } /* - * Reset the current CPU's ->dynticks counter to indicate that the + * Reset the current CPU's RCU_WATCHING counter to indicate that the * newly onlined CPU is no longer in an extended quiescent state. * This will either leave the counter unchanged, or increment it * to the next non-quiescent value. * * The non-atomic test/increment sequence works because the upper bits - * of the ->dynticks counter are manipulated only by the corresponding CPU, + * of the ->state variable are manipulated only by the corresponding CPU, * or when the corresponding CPU is offline. */ -static void rcu_dynticks_eqs_online(void) +static void rcu_watching_online(void) { - if (ct_dynticks() & RCU_DYNTICKS_IDX) + if (ct_rcu_watching() & CT_RCU_WATCHING) return; - ct_state_inc(RCU_DYNTICKS_IDX); + ct_state_inc(CT_RCU_WATCHING); } /* - * Return true if the snapshot returned from rcu_dynticks_snap() + * Return true if the snapshot returned from ct_rcu_watching() * indicates that RCU is in an extended quiescent state. */ -static bool rcu_dynticks_in_eqs(int snap) +static bool rcu_watching_snap_in_eqs(int snap) { - return !(snap & RCU_DYNTICKS_IDX); + return !(snap & CT_RCU_WATCHING); } -/* - * Return true if the CPU corresponding to the specified rcu_data - * structure has spent some time in an extended quiescent state since - * rcu_dynticks_snap() returned the specified snapshot. +/** + * rcu_watching_snap_stopped_since() - Has RCU stopped watching a given CPU + * since the specified @snap? + * + * @rdp: The rcu_data corresponding to the CPU for which to check EQS. + * @snap: rcu_watching snapshot taken when the CPU wasn't in an EQS. + * + * Returns true if the CPU corresponding to @rdp has spent some time in an + * extended quiescent state since @snap. Note that this doesn't check if it + * /still/ is in an EQS, just that it went through one since @snap. + * + * This is meant to be used in a loop waiting for a CPU to go through an EQS. */ -static bool rcu_dynticks_in_eqs_since(struct rcu_data *rdp, int snap) +static bool rcu_watching_snap_stopped_since(struct rcu_data *rdp, int snap) { /* * The first failing snapshot is already ordered against the accesses @@ -323,26 +331,29 @@ static bool rcu_dynticks_in_eqs_since(struct rcu_data *rdp, int snap) * performed by the remote CPU prior to entering idle and therefore can * rely solely on acquire semantics. */ - return snap != ct_dynticks_cpu_acquire(rdp->cpu); + if (WARN_ON_ONCE(rcu_watching_snap_in_eqs(snap))) + return true; + + return snap != ct_rcu_watching_cpu_acquire(rdp->cpu); } /* * Return true if the referenced integer is zero while the specified * CPU remains within a single extended quiescent state. */ -bool rcu_dynticks_zero_in_eqs(int cpu, int *vp) +bool rcu_watching_zero_in_eqs(int cpu, int *vp) { int snap; // If not quiescent, force back to earlier extended quiescent state. - snap = ct_dynticks_cpu(cpu) & ~RCU_DYNTICKS_IDX; - smp_rmb(); // Order ->dynticks and *vp reads. + snap = ct_rcu_watching_cpu(cpu) & ~CT_RCU_WATCHING; + smp_rmb(); // Order CT state and *vp reads. if (READ_ONCE(*vp)) return false; // Non-zero, so report failure; - smp_rmb(); // Order *vp read and ->dynticks re-read. + smp_rmb(); // Order *vp read and CT state re-read. // If still in the same extended quiescent state, we are good! - return snap == ct_dynticks_cpu(cpu); + return snap == ct_rcu_watching_cpu(cpu); } /* @@ -356,17 +367,17 @@ bool rcu_dynticks_zero_in_eqs(int cpu, int *vp) * * The caller must have disabled interrupts and must not be idle. */ -notrace void rcu_momentary_dyntick_idle(void) +notrace void rcu_momentary_eqs(void) { int seq; raw_cpu_write(rcu_data.rcu_need_heavy_qs, false); - seq = ct_state_inc(2 * RCU_DYNTICKS_IDX); + seq = ct_state_inc(2 * CT_RCU_WATCHING); /* It is illegal to call this from idle state. */ - WARN_ON_ONCE(!(seq & RCU_DYNTICKS_IDX)); + WARN_ON_ONCE(!(seq & CT_RCU_WATCHING)); rcu_preempt_deferred_qs(current); } -EXPORT_SYMBOL_GPL(rcu_momentary_dyntick_idle); +EXPORT_SYMBOL_GPL(rcu_momentary_eqs); /** * rcu_is_cpu_rrupt_from_idle - see if 'interrupted' from idle @@ -388,13 +399,13 @@ static int rcu_is_cpu_rrupt_from_idle(void) lockdep_assert_irqs_disabled(); /* Check for counter underflows */ - RCU_LOCKDEP_WARN(ct_dynticks_nesting() < 0, - "RCU dynticks_nesting counter underflow!"); - RCU_LOCKDEP_WARN(ct_dynticks_nmi_nesting() <= 0, - "RCU dynticks_nmi_nesting counter underflow/zero!"); + RCU_LOCKDEP_WARN(ct_nesting() < 0, + "RCU nesting counter underflow!"); + RCU_LOCKDEP_WARN(ct_nmi_nesting() <= 0, + "RCU nmi_nesting counter underflow/zero!"); /* Are we at first interrupt nesting level? */ - nesting = ct_dynticks_nmi_nesting(); + nesting = ct_nmi_nesting(); if (nesting > 1) return false; @@ -404,7 +415,7 @@ static int rcu_is_cpu_rrupt_from_idle(void) WARN_ON_ONCE(!nesting && !is_idle_task(current)); /* Does CPU appear to be idle from an RCU standpoint? */ - return ct_dynticks_nesting() == 0; + return ct_nesting() == 0; } #define DEFAULT_RCU_BLIMIT (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD) ? 1000 : 10) @@ -596,12 +607,12 @@ void rcu_irq_exit_check_preempt(void) { lockdep_assert_irqs_disabled(); - RCU_LOCKDEP_WARN(ct_dynticks_nesting() <= 0, - "RCU dynticks_nesting counter underflow/zero!"); - RCU_LOCKDEP_WARN(ct_dynticks_nmi_nesting() != - DYNTICK_IRQ_NONIDLE, - "Bad RCU dynticks_nmi_nesting counter\n"); - RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(), + RCU_LOCKDEP_WARN(ct_nesting() <= 0, + "RCU nesting counter underflow/zero!"); + RCU_LOCKDEP_WARN(ct_nmi_nesting() != + CT_NESTING_IRQ_NONIDLE, + "Bad RCU nmi_nesting counter\n"); + RCU_LOCKDEP_WARN(!rcu_is_watching_curr_cpu(), "RCU in extended quiescent state!"); } #endif /* #ifdef CONFIG_PROVE_RCU */ @@ -641,7 +652,7 @@ void __rcu_irq_enter_check_tick(void) if (in_nmi()) return; - RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(), + RCU_LOCKDEP_WARN(!rcu_is_watching_curr_cpu(), "Illegal rcu_irq_enter_check_tick() from extended quiescent state"); if (!tick_nohz_full_cpu(rdp->cpu) || @@ -723,7 +734,7 @@ notrace bool rcu_is_watching(void) bool ret; preempt_disable_notrace(); - ret = !rcu_dynticks_curr_cpu_in_eqs(); + ret = rcu_is_watching_curr_cpu(); preempt_enable_notrace(); return ret; } @@ -765,11 +776,11 @@ static void rcu_gpnum_ovf(struct rcu_node *rnp, struct rcu_data *rdp) } /* - * Snapshot the specified CPU's dynticks counter so that we can later + * Snapshot the specified CPU's RCU_WATCHING counter so that we can later * credit them with an implicit quiescent state. Return 1 if this CPU * is in dynticks idle mode, which is an extended quiescent state. */ -static int dyntick_save_progress_counter(struct rcu_data *rdp) +static int rcu_watching_snap_save(struct rcu_data *rdp) { /* * Full ordering between remote CPU's post idle accesses and updater's @@ -782,8 +793,8 @@ static int dyntick_save_progress_counter(struct rcu_data *rdp) * Ordering between remote CPU's pre idle accesses and post grace period * updater's accesses is enforced by the below acquire semantic. */ - rdp->dynticks_snap = ct_dynticks_cpu_acquire(rdp->cpu); - if (rcu_dynticks_in_eqs(rdp->dynticks_snap)) { + rdp->watching_snap = ct_rcu_watching_cpu_acquire(rdp->cpu); + if (rcu_watching_snap_in_eqs(rdp->watching_snap)) { trace_rcu_fqs(rcu_state.name, rdp->gp_seq, rdp->cpu, TPS("dti")); rcu_gpnum_ovf(rdp->mynode, rdp); return 1; @@ -794,14 +805,14 @@ static int dyntick_save_progress_counter(struct rcu_data *rdp) /* * Returns positive if the specified CPU has passed through a quiescent state * by virtue of being in or having passed through an dynticks idle state since - * the last call to dyntick_save_progress_counter() for this same CPU, or by + * the last call to rcu_watching_snap_save() for this same CPU, or by * virtue of having been offline. * * Returns negative if the specified CPU needs a force resched. * * Returns zero otherwise. */ -static int rcu_implicit_dynticks_qs(struct rcu_data *rdp) +static int rcu_watching_snap_recheck(struct rcu_data *rdp) { unsigned long jtsq; int ret = 0; @@ -815,7 +826,7 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp) * read-side critical section that started before the beginning * of the current RCU grace period. */ - if (rcu_dynticks_in_eqs_since(rdp, rdp->dynticks_snap)) { + if (rcu_watching_snap_stopped_since(rdp, rdp->watching_snap)) { trace_rcu_fqs(rcu_state.name, rdp->gp_seq, rdp->cpu, TPS("dti")); rcu_gpnum_ovf(rnp, rdp); return 1; @@ -1649,7 +1660,7 @@ static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work) * the done tail list manipulations are protected here. */ done = smp_load_acquire(&rcu_state.srs_done_tail); - if (!done) + if (WARN_ON_ONCE(!done)) return; WARN_ON_ONCE(!rcu_sr_is_wait_head(done)); @@ -1984,10 +1995,10 @@ static void rcu_gp_fqs(bool first_time) if (first_time) { /* Collect dyntick-idle snapshots. */ - force_qs_rnp(dyntick_save_progress_counter); + force_qs_rnp(rcu_watching_snap_save); } else { /* Handle dyntick-idle and offline CPUs. */ - force_qs_rnp(rcu_implicit_dynticks_qs); + force_qs_rnp(rcu_watching_snap_recheck); } /* Clear flag to prevent immediate re-entry. */ if (READ_ONCE(rcu_state.gp_flags) & RCU_GP_FLAG_FQS) { @@ -2383,7 +2394,6 @@ rcu_report_qs_rdp(struct rcu_data *rdp) { unsigned long flags; unsigned long mask; - bool needacc = false; struct rcu_node *rnp; WARN_ON_ONCE(rdp->cpu != smp_processor_id()); @@ -2420,23 +2430,11 @@ rcu_report_qs_rdp(struct rcu_data *rdp) * to return true. So complain, but don't awaken. */ WARN_ON_ONCE(rcu_accelerate_cbs(rnp, rdp)); - } else if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) { - /* - * ...but NOCB kthreads may miss or delay callbacks acceleration - * if in the middle of a (de-)offloading process. - */ - needacc = true; } rcu_disable_urgency_upon_qs(rdp); rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags); /* ^^^ Released rnp->lock */ - - if (needacc) { - rcu_nocb_lock_irqsave(rdp, flags); - rcu_accelerate_cbs_unlocked(rnp, rdp); - rcu_nocb_unlock_irqrestore(rdp, flags); - } } } @@ -2791,24 +2789,6 @@ static __latent_entropy void rcu_core(void) unsigned long flags; struct rcu_data *rdp = raw_cpu_ptr(&rcu_data); struct rcu_node *rnp = rdp->mynode; - /* - * On RT rcu_core() can be preempted when IRQs aren't disabled. - * Therefore this function can race with concurrent NOCB (de-)offloading - * on this CPU and the below condition must be considered volatile. - * However if we race with: - * - * _ Offloading: In the worst case we accelerate or process callbacks - * concurrently with NOCB kthreads. We are guaranteed to - * call rcu_nocb_lock() if that happens. - * - * _ Deoffloading: In the worst case we miss callbacks acceleration or - * processing. This is fine because the early stage - * of deoffloading invokes rcu_core() after setting - * SEGCBLIST_RCU_CORE. So we guarantee that we'll process - * what could have been dismissed without the need to wait - * for the next rcu_pending() check in the next jiffy. - */ - const bool do_batch = !rcu_segcblist_completely_offloaded(&rdp->cblist); if (cpu_is_offline(smp_processor_id())) return; @@ -2828,17 +2808,17 @@ static __latent_entropy void rcu_core(void) /* No grace period and unregistered callbacks? */ if (!rcu_gp_in_progress() && - rcu_segcblist_is_enabled(&rdp->cblist) && do_batch) { - rcu_nocb_lock_irqsave(rdp, flags); + rcu_segcblist_is_enabled(&rdp->cblist) && !rcu_rdp_is_offloaded(rdp)) { + local_irq_save(flags); if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL)) rcu_accelerate_cbs_unlocked(rnp, rdp); - rcu_nocb_unlock_irqrestore(rdp, flags); + local_irq_restore(flags); } rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check()); /* If there are callbacks ready, invoke them. */ - if (do_batch && rcu_segcblist_ready_cbs(&rdp->cblist) && + if (!rcu_rdp_is_offloaded(rdp) && rcu_segcblist_ready_cbs(&rdp->cblist) && likely(READ_ONCE(rcu_scheduler_fully_active))) { rcu_do_batch(rdp); /* Re-invoke RCU core processing if there are callbacks remaining. */ @@ -2855,7 +2835,7 @@ static __latent_entropy void rcu_core(void) queue_work_on(rdp->cpu, rcu_gp_wq, &rdp->strict_work); } -static void rcu_core_si(struct softirq_action *h) +static void rcu_core_si(void) { rcu_core(); } @@ -3227,7 +3207,7 @@ struct kvfree_rcu_bulk_data { struct list_head list; struct rcu_gp_oldstate gp_snap; unsigned long nr_records; - void *records[]; + void *records[] __counted_by(nr_records); }; /* @@ -3539,10 +3519,10 @@ schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp) if (delayed_work_pending(&krcp->monitor_work)) { delay_left = krcp->monitor_work.timer.expires - jiffies; if (delay < delay_left) - mod_delayed_work(system_wq, &krcp->monitor_work, delay); + mod_delayed_work(system_unbound_wq, &krcp->monitor_work, delay); return; } - queue_delayed_work(system_wq, &krcp->monitor_work, delay); + queue_delayed_work(system_unbound_wq, &krcp->monitor_work, delay); } static void @@ -3584,18 +3564,15 @@ kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp) } /* - * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. + * Return: %true if a work is queued, %false otherwise. */ -static void kfree_rcu_monitor(struct work_struct *work) +static bool +kvfree_rcu_queue_batch(struct kfree_rcu_cpu *krcp) { - struct kfree_rcu_cpu *krcp = container_of(work, - struct kfree_rcu_cpu, monitor_work.work); unsigned long flags; + bool queued = false; int i, j; - // Drain ready for reclaim. - kvfree_rcu_drain_ready(krcp); - raw_spin_lock_irqsave(&krcp->lock, flags); // Attempt to start a new batch. @@ -3634,11 +3611,27 @@ static void kfree_rcu_monitor(struct work_struct *work) // be that the work is in the pending state when // channels have been detached following by each // other. - queue_rcu_work(system_wq, &krwp->rcu_work); + queued = queue_rcu_work(system_unbound_wq, &krwp->rcu_work); } } raw_spin_unlock_irqrestore(&krcp->lock, flags); + return queued; +} + +/* + * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. + */ +static void kfree_rcu_monitor(struct work_struct *work) +{ + struct kfree_rcu_cpu *krcp = container_of(work, + struct kfree_rcu_cpu, monitor_work.work); + + // Drain ready for reclaim. + kvfree_rcu_drain_ready(krcp); + + // Queue a batch for a rest. + kvfree_rcu_queue_batch(krcp); // If there is nothing to detach, it means that our job is // successfully done here. In case of having at least one @@ -3704,7 +3697,7 @@ run_page_cache_worker(struct kfree_rcu_cpu *krcp) if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING && !atomic_xchg(&krcp->work_in_progress, 1)) { if (atomic_read(&krcp->backoff_page_cache_fill)) { - queue_delayed_work(system_wq, + queue_delayed_work(system_unbound_wq, &krcp->page_cache_work, msecs_to_jiffies(rcu_delay_page_cache_fill_msec)); } else { @@ -3767,7 +3760,8 @@ add_ptr_to_bulk_krc_lock(struct kfree_rcu_cpu **krcp, } // Finally insert and update the GP for this page. - bnode->records[bnode->nr_records++] = ptr; + bnode->nr_records++; + bnode->records[bnode->nr_records - 1] = ptr; get_state_synchronize_rcu_full(&bnode->gp_snap); atomic_inc(&(*krcp)->bulk_count[idx]); @@ -3859,6 +3853,86 @@ unlock_return: } EXPORT_SYMBOL_GPL(kvfree_call_rcu); +/** + * kvfree_rcu_barrier - Wait until all in-flight kvfree_rcu() complete. + * + * Note that a single argument of kvfree_rcu() call has a slow path that + * triggers synchronize_rcu() following by freeing a pointer. It is done + * before the return from the function. Therefore for any single-argument + * call that will result in a kfree() to a cache that is to be destroyed + * during module exit, it is developer's responsibility to ensure that all + * such calls have returned before the call to kmem_cache_destroy(). + */ +void kvfree_rcu_barrier(void) +{ + struct kfree_rcu_cpu_work *krwp; + struct kfree_rcu_cpu *krcp; + bool queued; + int i, cpu; + + /* + * Firstly we detach objects and queue them over an RCU-batch + * for all CPUs. Finally queued works are flushed for each CPU. + * + * Please note. If there are outstanding batches for a particular + * CPU, those have to be finished first following by queuing a new. + */ + for_each_possible_cpu(cpu) { + krcp = per_cpu_ptr(&krc, cpu); + + /* + * Check if this CPU has any objects which have been queued for a + * new GP completion. If not(means nothing to detach), we are done + * with it. If any batch is pending/running for this "krcp", below + * per-cpu flush_rcu_work() waits its completion(see last step). + */ + if (!need_offload_krc(krcp)) + continue; + + while (1) { + /* + * If we are not able to queue a new RCU work it means: + * - batches for this CPU are still in flight which should + * be flushed first and then repeat; + * - no objects to detach, because of concurrency. + */ + queued = kvfree_rcu_queue_batch(krcp); + + /* + * Bail out, if there is no need to offload this "krcp" + * anymore. As noted earlier it can run concurrently. + */ + if (queued || !need_offload_krc(krcp)) + break; + + /* There are ongoing batches. */ + for (i = 0; i < KFREE_N_BATCHES; i++) { + krwp = &(krcp->krw_arr[i]); + flush_rcu_work(&krwp->rcu_work); + } + } + } + + /* + * Now we guarantee that all objects are flushed. + */ + for_each_possible_cpu(cpu) { + krcp = per_cpu_ptr(&krc, cpu); + + /* + * A monitor work can drain ready to reclaim objects + * directly. Wait its completion if running or pending. + */ + cancel_delayed_work_sync(&krcp->monitor_work); + + for (i = 0; i < KFREE_N_BATCHES; i++) { + krwp = &(krcp->krw_arr[i]); + flush_rcu_work(&krwp->rcu_work); + } + } +} +EXPORT_SYMBOL_GPL(kvfree_rcu_barrier); + static unsigned long kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc) { @@ -4403,6 +4477,7 @@ static void rcu_barrier_callback(struct rcu_head *rhp) { unsigned long __maybe_unused s = rcu_state.barrier_sequence; + rhp->next = rhp; // Mark the callback as having been invoked. if (atomic_dec_and_test(&rcu_state.barrier_cpu_count)) { rcu_barrier_trace(TPS("LastCB"), -1, s); complete(&rcu_state.barrier_completion); @@ -4804,8 +4879,8 @@ rcu_boot_init_percpu_data(int cpu) /* Set up local state, ensuring consistent view of global state. */ rdp->grpmask = leaf_node_cpu_bit(rdp->mynode, cpu); INIT_WORK(&rdp->strict_work, strict_work_handler); - WARN_ON_ONCE(ct->dynticks_nesting != 1); - WARN_ON_ONCE(rcu_dynticks_in_eqs(ct_dynticks_cpu(cpu))); + WARN_ON_ONCE(ct->nesting != 1); + WARN_ON_ONCE(rcu_watching_snap_in_eqs(ct_rcu_watching_cpu(cpu))); rdp->barrier_seq_snap = rcu_state.barrier_sequence; rdp->rcu_ofl_gp_seq = rcu_state.gp_seq; rdp->rcu_ofl_gp_state = RCU_GP_CLEANED; @@ -4898,7 +4973,7 @@ int rcutree_prepare_cpu(unsigned int cpu) rdp->qlen_last_fqs_check = 0; rdp->n_force_qs_snap = READ_ONCE(rcu_state.n_force_qs); rdp->blimit = blimit; - ct->dynticks_nesting = 1; /* CPU not up, no tearing. */ + ct->nesting = 1; /* CPU not up, no tearing. */ raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ /* @@ -5058,7 +5133,7 @@ void rcutree_report_cpu_starting(unsigned int cpu) rnp = rdp->mynode; mask = rdp->grpmask; arch_spin_lock(&rcu_state.ofl_lock); - rcu_dynticks_eqs_online(); + rcu_watching_online(); raw_spin_lock(&rcu_state.barrier_lock); raw_spin_lock_rcu_node(rnp); WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext | mask); @@ -5424,6 +5499,8 @@ static void __init rcu_init_one(void) while (i > rnp->grphi) rnp++; per_cpu_ptr(&rcu_data, i)->mynode = rnp; + per_cpu_ptr(&rcu_data, i)->barrier_head.next = + &per_cpu_ptr(&rcu_data, i)->barrier_head; rcu_boot_init_percpu_data(i); } } |