diff options
Diffstat (limited to 'fs/dlm/lock.c')
| -rw-r--r-- | fs/dlm/lock.c | 568 |
1 files changed, 279 insertions, 289 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index f103b8c30592..8bee4f444afd 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, const struct dlm_message *ms, bool local); static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); -static void toss_rsb(struct kref *kref); +static void deactivate_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -330,8 +330,8 @@ static inline unsigned long rsb_toss_jiffies(void) static inline void hold_rsb(struct dlm_rsb *r) { - /* rsbs in toss state never get referenced */ - WARN_ON(rsb_flag(r, RSB_TOSS)); + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); kref_get(&r->res_ref); } @@ -370,15 +370,12 @@ static inline int dlm_kref_put_write_lock_bh(struct kref *kref, return 0; } -/* When all references to the rsb are gone it's transferred to - the tossed list for later disposal. */ - static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; int rv; - rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb, + rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb, &ls->ls_rsbtbl_lock); if (rv) write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -389,82 +386,54 @@ void dlm_put_rsb(struct dlm_rsb *r) put_rsb(r); } -static int pre_rsb_struct(struct dlm_ls *ls) -{ - struct dlm_rsb *r1, *r2; - int count = 0; - - spin_lock_bh(&ls->ls_new_rsb_spin); - if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { - spin_unlock_bh(&ls->ls_new_rsb_spin); - return 0; - } - spin_unlock_bh(&ls->ls_new_rsb_spin); - - r1 = dlm_allocate_rsb(ls); - r2 = dlm_allocate_rsb(ls); - - spin_lock_bh(&ls->ls_new_rsb_spin); - if (r1) { - list_add(&r1->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; - } - if (r2) { - list_add(&r2->res_hashchain, &ls->ls_new_rsb); - ls->ls_new_rsb_count++; - } - count = ls->ls_new_rsb_count; - spin_unlock_bh(&ls->ls_new_rsb_spin); - - if (!count) - return -ENOMEM; - return 0; -} - /* connected with timer_delete_sync() in dlm_ls_stop() to stop * new timers when recovery is triggered and don't run them - * again until a dlm_timer_resume() tries it again. + * again until a resume_scan_timer() tries it again. */ -static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies) +static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies) { if (!dlm_locking_stopped(ls)) - mod_timer(&ls->ls_timer, jiffies); + mod_timer(&ls->ls_scan_timer, jiffies); } /* This function tries to resume the timer callback if a rsb - * is on the toss list and no timer is pending. It might that + * is on the scan list and no timer is pending. It might that * the first entry is on currently executed as timer callback * but we don't care if a timer queued up again and does * nothing. Should be a rare case. */ -void dlm_timer_resume(struct dlm_ls *ls) +void resume_scan_timer(struct dlm_ls *ls) { struct dlm_rsb *r; - spin_lock_bh(&ls->ls_toss_q_lock); - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - if (r && !timer_pending(&ls->ls_timer)) - __rsb_mod_timer(ls, r->res_toss_time); - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_lock_bh(&ls->ls_scan_lock); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (r && !timer_pending(&ls->ls_scan_timer)) + enable_scan_timer(ls, r->res_toss_time); + spin_unlock_bh(&ls->ls_scan_lock); } -/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */ -static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) +/* ls_rsbtbl_lock must be held */ + +static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) { struct dlm_rsb *first; - spin_lock_bh(&ls->ls_toss_q_lock); + /* active rsbs should never be on the scan list */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); + + spin_lock_bh(&ls->ls_scan_lock); r->res_toss_time = 0; /* if the rsb is not queued do nothing */ - if (list_empty(&r->res_toss_q_list)) + if (list_empty(&r->res_scan_list)) goto out; /* get the first element before delete */ - first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - list_del_init(&r->res_toss_q_list); + first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_del_init(&r->res_scan_list); /* check if the first element was the rsb we deleted */ if (first == r) { /* try to get the new first element, if the list @@ -474,70 +443,59 @@ static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) * if the list isn't empty and a new first element got * in place, set the new timer expire time. */ - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); if (!first) - timer_delete(&ls->ls_timer); + timer_delete(&ls->ls_scan_timer); else - __rsb_mod_timer(ls, first->res_toss_time); + enable_scan_timer(ls, first->res_toss_time); } out: - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_unlock_bh(&ls->ls_scan_lock); } -/* Caller must held ls_rsbtbl_lock and need to be called every time - * when either the rsb enters toss state or the toss state changes - * the dir/master nodeid. - */ -static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) +static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) { int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *first; - /* If we're the directory record for this rsb, and - * we're not the master of it, then we need to wait - * for the master node to send us a dir remove for - * before removing the dir record. - */ - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) == our_nodeid)) { - rsb_delete_toss_timer(ls, r); - return; - } + /* A dir record for a remote master rsb should never be on the scan list. */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* An active rsb should never be on the scan list. */ + WARN_ON(!rsb_flag(r, RSB_INACTIVE)); - spin_lock_bh(&ls->ls_toss_q_lock); + /* An rsb should not already be on the scan list. */ + WARN_ON(!list_empty(&r->res_scan_list)); + + spin_lock_bh(&ls->ls_scan_lock); /* set the new rsb absolute expire time in the rsb */ r->res_toss_time = rsb_toss_jiffies(); - if (list_empty(&ls->ls_toss_q)) { + if (list_empty(&ls->ls_scan_list)) { /* if the queue is empty add the element and it's * our new expire time */ - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); - __rsb_mod_timer(ls, r->res_toss_time); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + enable_scan_timer(ls, r->res_toss_time); } else { - /* check if the rsb was already queued, if so delete - * it from the toss queue - */ - if (!list_empty(&r->res_toss_q_list)) - list_del(&r->res_toss_q_list); - /* try to get the maybe new first element and then add * to this rsb with the oldest expire time to the end * of the queue. If the list was empty before this * rsb expire time is our next expiration if it wasn't * the now new first elemet is our new expiration time */ - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); if (!first) - __rsb_mod_timer(ls, r->res_toss_time); + enable_scan_timer(ls, r->res_toss_time); else - __rsb_mod_timer(ls, first->res_toss_time); + enable_scan_timer(ls, first->res_toss_time); } - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_unlock_bh(&ls->ls_scan_lock); } /* if we hit contention we do in 250 ms a retry to trylock. @@ -547,9 +505,11 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) */ #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) -void dlm_rsb_toss_timer(struct timer_list *timer) +/* Called by lockspace scan_timer to free unused rsb's. */ + +void dlm_rsb_scan(struct timer_list *timer) { - struct dlm_ls *ls = from_timer(ls, timer, ls_timer); + struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *r; int rv; @@ -557,76 +517,63 @@ void dlm_rsb_toss_timer(struct timer_list *timer) while (1) { /* interrupting point to leave iteration when * recovery waits for timer_delete_sync(), recovery - * will take care to delete everything in toss queue. + * will take care to delete everything in scan list. */ if (dlm_locking_stopped(ls)) break; - rv = spin_trylock(&ls->ls_toss_q_lock); + rv = spin_trylock(&ls->ls_scan_lock); if (!rv) { /* rearm again try timer */ - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break; } - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); if (!r) { - /* nothing to do anymore next rsb queue will - * set next mod_timer() expire. - */ - spin_unlock(&ls->ls_toss_q_lock); + /* the next add_scan will enable the timer again */ + spin_unlock(&ls->ls_scan_lock); break; } - /* test if the first rsb isn't expired yet, if - * so we stop freeing rsb from toss queue as - * the order in queue is ascending to the - * absolute res_toss_time jiffies + /* + * If the first rsb is not yet expired, then stop because the + * list is sorted with nearest expiration first. */ if (time_before(jiffies, r->res_toss_time)) { /* rearm with the next rsb to expire in the future */ - __rsb_mod_timer(ls, r->res_toss_time); - spin_unlock(&ls->ls_toss_q_lock); + enable_scan_timer(ls, r->res_toss_time); + spin_unlock(&ls->ls_scan_lock); break; } /* in find_rsb_dir/nodir there is a reverse order of this * lock, however this is only a trylock if we hit some * possible contention we try it again. - * - * This lock synchronized while holding ls_toss_q_lock - * synchronize everything that rsb_delete_toss_timer() - * or rsb_mod_timer() can't run after this timer callback - * deletes the rsb from the ls_toss_q. Whereas the other - * holders have always a priority to run as this is only - * a caching handling and the other holders might to put - * this rsb out of the toss state. */ rv = write_trylock(&ls->ls_rsbtbl_lock); if (!rv) { - spin_unlock(&ls->ls_toss_q_lock); + spin_unlock(&ls->ls_scan_lock); /* rearm again try timer */ - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break; } - list_del(&r->res_rsbs_list); + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); - /* not necessary to held the ls_rsbtbl_lock when - * calling send_remove() - */ + /* ls_rsbtbl_lock is not needed when calling send_remove() */ write_unlock(&ls->ls_rsbtbl_lock); - /* remove the rsb out of the toss queue its gone - * drom DLM now - */ - list_del_init(&r->res_toss_q_list); - spin_unlock(&ls->ls_toss_q_lock); + list_del_init(&r->res_scan_list); + spin_unlock(&ls->ls_scan_lock); - /* no rsb in this state should ever run a timer */ + /* An rsb that is a dir record for a remote master rsb + * cannot be removed, and should not have a timer enabled. + */ WARN_ON(!dlm_no_directory(ls) && (r->res_master_nodeid != our_nodeid) && (dlm_dir_nodeid(r) == our_nodeid)); @@ -640,7 +587,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer) (dlm_dir_nodeid(r) != our_nodeid)) send_remove(r); - free_toss_rsb(r); + free_inactive_rsb(r); } } @@ -652,22 +599,10 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, struct dlm_rsb **r_ret) { struct dlm_rsb *r; - int count; - spin_lock_bh(&ls->ls_new_rsb_spin); - if (list_empty(&ls->ls_new_rsb)) { - count = ls->ls_new_rsb_count; - spin_unlock_bh(&ls->ls_new_rsb_spin); - log_debug(ls, "find_rsb retry %d %d %s", - count, dlm_config.ci_new_rsb_count, - (const char *)name); - return -EAGAIN; - } - - r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); - list_del(&r->res_hashchain); - ls->ls_new_rsb_count--; - spin_unlock_bh(&ls->ls_new_rsb_spin); + r = dlm_allocate_rsb(ls); + if (!r) + return -ENOMEM; r->res_ls = ls; r->res_length = len; @@ -679,7 +614,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); - INIT_LIST_HEAD(&r->res_toss_q_list); + INIT_LIST_HEAD(&r->res_scan_list); INIT_LIST_HEAD(&r->res_recover_list); INIT_LIST_HEAD(&r->res_masters_list); @@ -702,8 +637,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) { - return rhashtable_insert_fast(rhash, &rsb->res_node, - dlm_rhash_rsb_params); + int rv; + + rv = rhashtable_insert_fast(rhash, &rsb->res_node, + dlm_rhash_rsb_params); + if (!rv) + rsb_set_flag(rsb, RSB_HASHED); + + return rv; } /* @@ -733,7 +674,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) * So, if the given rsb is on the toss list, it is moved to the keep list * before being returned. * - * toss_rsb() happens when all local usage of the rsb is done, i.e. no + * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no * more refcounts exist, so the rsb is moved from the keep list to the * toss list. * @@ -781,9 +722,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * * If someone sends us a request, we are the dir node, and we do * not find the rsb anywhere, then recreate it. This happens if - * someone sends us a request after we have removed/freed an rsb - * from our toss list. (They sent a request instead of lookup - * because they are using an rsb from their toss list.) + * someone sends us a request after we have removed/freed an rsb. + * (They sent a request instead of lookup because they are using + * an rsb taken from their scan list.) */ if (from_local || from_dir || @@ -792,15 +733,8 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, } retry: - if (create) { - error = pre_rsb_struct(ls); - if (error < 0) - goto out; - } - retry_lookup: - - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is active under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) { @@ -812,9 +746,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * rsb is active, so we can't check master_nodeid without lock_rsb. */ - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } kref_get(&r->res_ref); @@ -822,17 +756,29 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, goto out; - do_toss: + do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in toss state - * if not it's in keep state and we relookup - unlikely path. + /* + * The expectation here is that the rsb will have HASHED and + * INACTIVE flags set, and that the rsb can be moved from + * inactive back to active again. However, between releasing + * the read lock and acquiring the write lock, this rsb could + * have been removed from rsbtbl, and had HASHED cleared, to + * be freed. To deal with this case, we would normally need + * to repeat dlm_search_rsb_tree while holding the write lock, + * but rcu allows us to simply check the HASHED flag, because + * the rcu read lock means the rsb will not be freed yet. + * If the HASHED flag is not set, then the rsb is being freed, + * so we add a new rsb struct. If the HASHED flag is set, + * and INACTIVE is not set, it means another thread has + * made the rsb active, as we're expecting to do here, and + * we just repeat the lookup (this will be very unlikely.) */ - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); - goto retry_lookup; + goto retry; } } else { write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -842,14 +788,14 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, /* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread - * is using this rsb because it's on the toss list, so we can + * is using this rsb because it's inactive, so we can * look at or update res_master_nodeid without lock_rsb. */ if ((r->res_master_nodeid != our_nodeid) && from_other) { /* our rsb was not master, and another node (not the dir node) has sent us a request */ - log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", + log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s", from_nodeid, r->res_master_nodeid, dir_nodeid, r->res_name); write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -859,7 +805,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, if ((r->res_master_nodeid != our_nodeid) && from_dir) { /* don't think this should ever happen */ - log_error(ls, "find_rsb toss from_dir %d master %d", + log_error(ls, "find_rsb inactive from_dir %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); /* fix it and go on */ @@ -876,14 +822,12 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, r->res_first_lkid = 0; } - list_move(&r->res_rsbs_list, &ls->ls_keep); - rsb_clear_flag(r, RSB_TOSS); - /* rsb got out of toss state, it becomes alive again - * and we reinit the reference counter that is only - * valid for keep state rsbs - */ - kref_init(&r->res_ref); - rsb_delete_toss_timer(ls, r); + /* A dir record will not be on the scan list. */ + if (r->res_dir_nodeid != our_nodeid) + del_scan(ls, r); + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); + kref_init(&r->res_ref); /* ref is now used in active state */ write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -898,9 +842,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, goto out; error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) - goto retry; - if (error) + if (WARN_ON_ONCE(error)) goto out; r->res_hash = hash; @@ -952,9 +894,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, */ write_unlock_bh(&ls->ls_rsbtbl_lock); dlm_free_rsb(r); - goto retry_lookup; + goto retry; } else if (!error) { - list_add(&r->res_rsbs_list, &ls->ls_keep); + list_add(&r->res_slow_list, &ls->ls_slow_active); } write_unlock_bh(&ls->ls_rsbtbl_lock); out: @@ -976,13 +918,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, int error; retry: - error = pre_rsb_struct(ls); - if (error < 0) - goto out; - retry_lookup: - - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is in active state under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) { @@ -990,9 +927,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, goto do_new; } - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } /* @@ -1005,17 +942,14 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, goto out; - do_toss: + do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in toss state - * if not it's in keep state and we relookup - unlikely path. - */ - error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + /* See comment in find_rsb_dir. */ + if (rsb_flag(r, RSB_HASHED)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); - goto retry_lookup; + goto retry; } } else { write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1025,14 +959,14 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, /* * rsb found inactive. No other thread is using this rsb because - * it's on the toss list, so we can look at or update - * res_master_nodeid without lock_rsb. + * it's inactive, so we can look at or update res_master_nodeid + * without lock_rsb. */ if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { /* our rsb is not master, and another node has sent us a request; this should never happen */ - log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", + log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d", from_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1044,21 +978,17 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, (dir_nodeid == our_nodeid)) { /* our rsb is not master, and we are dir; may as well fix it; this should never happen */ - log_error(ls, "find_rsb toss our %d master %d dir %d", + log_error(ls, "find_rsb inactive our %d master %d dir %d", our_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; } - list_move(&r->res_rsbs_list, &ls->ls_keep); - rsb_clear_flag(r, RSB_TOSS); - /* rsb got out of toss state, it becomes alive again - * and we reinit the reference counter that is only - * valid for keep state rsbs - */ + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); - rsb_delete_toss_timer(ls, r); + del_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -1070,10 +1000,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, */ error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) { - goto retry; - } - if (error) + if (WARN_ON_ONCE(error)) goto out; r->res_hash = hash; @@ -1090,9 +1017,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, */ write_unlock_bh(&ls->ls_rsbtbl_lock); dlm_free_rsb(r); - goto retry_lookup; + goto retry; } else if (!error) { - list_add(&r->res_rsbs_list, &ls->ls_keep); + list_add(&r->res_slow_list, &ls->ls_slow_active); } write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1101,12 +1028,54 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, return error; } +/* + * rsb rcu usage + * + * While rcu read lock is held, the rsb cannot be freed, + * which allows a lookup optimization. + * + * Two threads are accessing the same rsb concurrently, + * the first (A) is trying to use the rsb, the second (B) + * is trying to free the rsb. + * + * thread A thread B + * (trying to use rsb) (trying to free rsb) + * + * A1. rcu read lock + * A2. rsbtbl read lock + * A3. look up rsb in rsbtbl + * A4. rsbtbl read unlock + * B1. rsbtbl write lock + * B2. look up rsb in rsbtbl + * B3. remove rsb from rsbtbl + * B4. clear rsb HASHED flag + * B5. rsbtbl write unlock + * B6. begin freeing rsb using rcu... + * + * (rsb is inactive, so try to make it active again) + * A5. read rsb HASHED flag (safe because rsb is not freed yet) + * A6. the rsb HASHED flag is not set, which it means the rsb + * is being removed from rsbtbl and freed, so don't use it. + * A7. rcu read unlock + * + * B7. ...finish freeing rsb using rcu + * A8. create a new rsb + * + * Without the rcu optimization, steps A5-8 would need to do + * an extra rsbtbl lookup: + * A5. rsbtbl write lock + * A6. look up rsb in rsbtbl, not found + * A7. rsbtbl write unlock + * A8. create a new rsb + */ + static int find_rsb(struct dlm_ls *ls, const void *name, int len, int from_nodeid, unsigned int flags, struct dlm_rsb **r_ret) { int dir_nodeid; uint32_t hash; + int rv; if (len > DLM_RESNAME_MAXLEN) return -EINVAL; @@ -1114,12 +1083,15 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len, hash = jhash(name, len, 0); dir_nodeid = dlm_hash2nodeid(ls, hash); + rcu_read_lock(); if (dlm_no_directory(ls)) - return find_rsb_nodir(ls, name, len, hash, dir_nodeid, + rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); else - return find_rsb_dir(ls, name, len, hash, dir_nodeid, + rv = find_rsb_dir(ls, name, len, hash, dir_nodeid, from_nodeid, flags, r_ret); + rcu_read_unlock(); + return rv; } /* we have received a request and found that res_master_nodeid != our_nodeid, @@ -1166,7 +1138,7 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, } static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, - int from_nodeid, bool toss_list, unsigned int flags, + int from_nodeid, bool is_inactive, unsigned int flags, int *r_nodeid, int *result) { int fix_master = (flags & DLM_LU_RECOVER_MASTER); @@ -1190,9 +1162,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no r->res_nodeid = from_nodeid; rsb_set_flag(r, RSB_NEW_MASTER); - if (toss_list) { - /* I don't think we should ever find it on toss list. */ - log_error(ls, "%s fix_master on toss", __func__); + if (is_inactive) { + /* I don't think we should ever find it inactive. */ + log_error(ls, "%s fix_master inactive", __func__); dlm_dump_rsb(r); } } @@ -1232,7 +1204,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no if (!from_master && !fix_master && (r->res_master_nodeid == from_nodeid)) { /* this can happen when the master sends remove, the dir node - * finds the rsb on the keep list and ignores the remove, + * finds the rsb on the active list and ignores the remove, * and the former master sends a lookup */ @@ -1276,8 +1248,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, - int len, unsigned int flags, int *r_nodeid, int *result) +static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; uint32_t hash; @@ -1304,19 +1276,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, } retry: - error = pre_rsb_struct(ls); - if (error < 0) - return error; - - retry_lookup: - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is active under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } /* because the rsb is active, we need to lock_rsb before @@ -1340,53 +1307,48 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, goto not_found; } - do_toss: + do_inactive: /* unlikely path - relookup under write */ write_lock_bh(&ls->ls_rsbtbl_lock); - /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock - * check if the rsb is still in toss state, if not relookup - */ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); /* something as changed, very unlikely but * try again */ - goto retry_lookup; + goto retry; } } else { write_unlock_bh(&ls->ls_rsbtbl_lock); goto not_found; } - /* because the rsb is inactive (on toss list), it's not refcounted - * and lock_rsb is not used, but is protected by the rsbtbl lock - */ + /* because the rsb is inactive, it's not refcounted and lock_rsb + is not used, but is protected by the rsbtbl lock */ __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - rsb_mod_timer(ls, r); - /* the rsb was inactive (on toss list) */ + /* A dir record rsb should never be on scan list. */ + /* Try to fix this with del_scan? */ + WARN_ON(!list_empty(&r->res_scan_list)); + write_unlock_bh(&ls->ls_rsbtbl_lock); return 0; not_found: error = get_rsb_struct(ls, name, len, &r); - if (error == -EAGAIN) - goto retry; - if (error) + if (WARN_ON_ONCE(error)) goto out; r->res_hash = hash; r->res_dir_nodeid = our_nodeid; r->res_master_nodeid = from_nodeid; r->res_nodeid = from_nodeid; - kref_init(&r->res_ref); - rsb_set_flag(r, RSB_TOSS); + rsb_set_flag(r, RSB_INACTIVE); write_lock_bh(&ls->ls_rsbtbl_lock); error = rsb_insert(r, &ls->ls_rsbtbl); @@ -1396,7 +1358,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, */ write_unlock_bh(&ls->ls_rsbtbl_lock); dlm_free_rsb(r); - goto retry_lookup; + goto retry; } else if (error) { write_unlock_bh(&ls->ls_rsbtbl_lock); /* should never happen */ @@ -1404,8 +1366,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, goto retry; } - list_add(&r->res_rsbs_list, &ls->ls_toss); - rsb_mod_timer(ls, r); + list_add(&r->res_slow_list, &ls->ls_slow_inactive); write_unlock_bh(&ls->ls_rsbtbl_lock); if (result) @@ -1415,12 +1376,22 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, return error; } +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) +{ + int rv; + rcu_read_lock(); + rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result); + rcu_read_unlock(); + return rv; +} + static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { struct dlm_rsb *r; read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (r->res_hash == hash) dlm_dump_rsb(r); } @@ -1442,15 +1413,28 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) read_unlock_bh(&ls->ls_rsbtbl_lock); } -static void toss_rsb(struct kref *kref) +static void deactivate_rsb(struct kref *kref) { struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_ls *ls = r->res_ls; + int our_nodeid = dlm_our_nodeid(); DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); - rsb_set_flag(r, RSB_TOSS); - list_move(&r->res_rsbs_list, &ls->ls_toss); - rsb_mod_timer(ls, r); + rsb_set_flag(r, RSB_INACTIVE); + list_move(&r->res_slow_list, &ls->ls_slow_inactive); + + /* + * When the rsb becomes unused: + * - If it's not a dir record for a remote master rsb, + * then it is put on the scan list to be freed. + * - If it's a dir record for a remote master rsb, + * then it is kept in the inactive state until + * receive_remove() from the master node. + */ + if (!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) + add_scan(ls, r); if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); @@ -1464,22 +1448,22 @@ static void unhold_rsb(struct dlm_rsb *r) { int rv; - /* rsbs in toss state never get referenced */ - WARN_ON(rsb_flag(r, RSB_TOSS)); - rv = kref_put(&r->res_ref, toss_rsb); + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); + rv = kref_put(&r->res_ref, deactivate_rsb); DLM_ASSERT(!rv, dlm_dump_rsb(r);); } -void free_toss_rsb(struct dlm_rsb *r) +void free_inactive_rsb(struct dlm_rsb *r) { - WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS)); + WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE)); DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); - DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); @@ -1504,11 +1488,15 @@ static void detach_lkb(struct dlm_lkb *lkb) } static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, - int start, int end) + unsigned long start, unsigned long end) { + struct xa_limit limit; struct dlm_lkb *lkb; int rv; + limit.max = end; + limit.min = start; + lkb = dlm_allocate_lkb(ls); if (!lkb) return -ENOMEM; @@ -1522,14 +1510,12 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); - write_lock_bh(&ls->ls_lkbidr_lock); - rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); - if (rv >= 0) - lkb->lkb_id = rv; - write_unlock_bh(&ls->ls_lkbidr_lock); + write_lock_bh(&ls->ls_lkbxa_lock); + rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC); + write_unlock_bh(&ls->ls_lkbxa_lock); if (rv < 0) { - log_error(ls, "create_lkb idr error %d", rv); + log_error(ls, "create_lkb xa error %d", rv); dlm_free_lkb(lkb); return rv; } @@ -1540,18 +1526,18 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) { - return _create_lkb(ls, lkb_ret, 1, 0); + return _create_lkb(ls, lkb_ret, 1, ULONG_MAX); } static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - read_lock_bh(&ls->ls_lkbidr_lock); - lkb = idr_find(&ls->ls_lkbidr, lkid); + read_lock_bh(&ls->ls_lkbxa_lock); + lkb = xa_load(&ls->ls_lkbxa, lkid); if (lkb) kref_get(&lkb->lkb_ref); - read_unlock_bh(&ls->ls_lkbidr_lock); + read_unlock_bh(&ls->ls_lkbxa_lock); *lkb_ret = lkb; return lkb ? 0 : -ENOENT; @@ -1576,10 +1562,10 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) int rv; rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb, - &ls->ls_lkbidr_lock); + &ls->ls_lkbxa_lock); if (rv) { - idr_remove(&ls->ls_lkbidr, lkid); - write_unlock_bh(&ls->ls_lkbidr_lock); + xa_erase(&ls->ls_lkbxa, lkid); + write_unlock_bh(&ls->ls_lkbxa_lock); detach_lkb(lkb); @@ -4323,8 +4309,9 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) return; } - /* Look for name in rsb toss state, if it's there, kill it. - * If it's in non toss state, it's being used, and we should ignore this + /* + * Look for inactive rsb, if it's there, free it. + * If the rsb is active, it's being used, and we should ignore this * message. This is an expected race between the dir node sending a * request to the master node at the same time as the master node sends * a remove to the dir node. The resolution to that race is for the @@ -4347,16 +4334,18 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) return; } - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { if (r->res_master_nodeid != from_nodeid) { /* should not happen */ - log_error(ls, "receive_remove keep from %d master %d", + log_error(ls, "receive_remove on active rsb from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); return; } + /* Ignore the remove message, see race comment above. */ + log_debug(ls, "receive_remove from %d master %d first %x %s", from_nodeid, r->res_master_nodeid, r->res_first_lkid, name); @@ -4365,19 +4354,20 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) } if (r->res_master_nodeid != from_nodeid) { - log_error(ls, "receive_remove toss from %d master %d", + log_error(ls, "receive_remove inactive from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); return; } - list_del(&r->res_rsbs_list); + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + rsb_clear_flag(r, RSB_HASHED); write_unlock_bh(&ls->ls_rsbtbl_lock); - free_toss_rsb(r); + free_inactive_rsb(r); } static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) @@ -5444,7 +5434,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) struct dlm_rsb *r; read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (!rsb_flag(r, RSB_RECOVER_GRANT)) continue; if (!is_master(r)) { |