aboutsummaryrefslogtreecommitdiff
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c568
1 files changed, 279 insertions, 289 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index f103b8c30592..8bee4f444afd 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
const struct dlm_message *ms, bool local);
static int receive_extralen(const struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
-static void toss_rsb(struct kref *kref);
+static void deactivate_rsb(struct kref *kref);
/*
* Lock compatibilty matrix - thanks Steve
@@ -330,8 +330,8 @@ static inline unsigned long rsb_toss_jiffies(void)
static inline void hold_rsb(struct dlm_rsb *r)
{
- /* rsbs in toss state never get referenced */
- WARN_ON(rsb_flag(r, RSB_TOSS));
+ /* inactive rsbs are not ref counted */
+ WARN_ON(rsb_flag(r, RSB_INACTIVE));
kref_get(&r->res_ref);
}
@@ -370,15 +370,12 @@ static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
return 0;
}
-/* When all references to the rsb are gone it's transferred to
- the tossed list for later disposal. */
-
static void put_rsb(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
int rv;
- rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
+ rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
&ls->ls_rsbtbl_lock);
if (rv)
write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -389,82 +386,54 @@ void dlm_put_rsb(struct dlm_rsb *r)
put_rsb(r);
}
-static int pre_rsb_struct(struct dlm_ls *ls)
-{
- struct dlm_rsb *r1, *r2;
- int count = 0;
-
- spin_lock_bh(&ls->ls_new_rsb_spin);
- if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
- spin_unlock_bh(&ls->ls_new_rsb_spin);
- return 0;
- }
- spin_unlock_bh(&ls->ls_new_rsb_spin);
-
- r1 = dlm_allocate_rsb(ls);
- r2 = dlm_allocate_rsb(ls);
-
- spin_lock_bh(&ls->ls_new_rsb_spin);
- if (r1) {
- list_add(&r1->res_hashchain, &ls->ls_new_rsb);
- ls->ls_new_rsb_count++;
- }
- if (r2) {
- list_add(&r2->res_hashchain, &ls->ls_new_rsb);
- ls->ls_new_rsb_count++;
- }
- count = ls->ls_new_rsb_count;
- spin_unlock_bh(&ls->ls_new_rsb_spin);
-
- if (!count)
- return -ENOMEM;
- return 0;
-}
-
/* connected with timer_delete_sync() in dlm_ls_stop() to stop
* new timers when recovery is triggered and don't run them
- * again until a dlm_timer_resume() tries it again.
+ * again until a resume_scan_timer() tries it again.
*/
-static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies)
+static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
{
if (!dlm_locking_stopped(ls))
- mod_timer(&ls->ls_timer, jiffies);
+ mod_timer(&ls->ls_scan_timer, jiffies);
}
/* This function tries to resume the timer callback if a rsb
- * is on the toss list and no timer is pending. It might that
+ * is on the scan list and no timer is pending. It might that
* the first entry is on currently executed as timer callback
* but we don't care if a timer queued up again and does
* nothing. Should be a rare case.
*/
-void dlm_timer_resume(struct dlm_ls *ls)
+void resume_scan_timer(struct dlm_ls *ls)
{
struct dlm_rsb *r;
- spin_lock_bh(&ls->ls_toss_q_lock);
- r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
- res_toss_q_list);
- if (r && !timer_pending(&ls->ls_timer))
- __rsb_mod_timer(ls, r->res_toss_time);
- spin_unlock_bh(&ls->ls_toss_q_lock);
+ spin_lock_bh(&ls->ls_scan_lock);
+ r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
+ if (r && !timer_pending(&ls->ls_scan_timer))
+ enable_scan_timer(ls, r->res_toss_time);
+ spin_unlock_bh(&ls->ls_scan_lock);
}
-/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */
-static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
+/* ls_rsbtbl_lock must be held */
+
+static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
{
struct dlm_rsb *first;
- spin_lock_bh(&ls->ls_toss_q_lock);
+ /* active rsbs should never be on the scan list */
+ WARN_ON(!rsb_flag(r, RSB_INACTIVE));
+
+ spin_lock_bh(&ls->ls_scan_lock);
r->res_toss_time = 0;
/* if the rsb is not queued do nothing */
- if (list_empty(&r->res_toss_q_list))
+ if (list_empty(&r->res_scan_list))
goto out;
/* get the first element before delete */
- first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb,
- res_toss_q_list);
- list_del_init(&r->res_toss_q_list);
+ first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
+ list_del_init(&r->res_scan_list);
/* check if the first element was the rsb we deleted */
if (first == r) {
/* try to get the new first element, if the list
@@ -474,70 +443,59 @@ static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
* if the list isn't empty and a new first element got
* in place, set the new timer expire time.
*/
- first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
- res_toss_q_list);
+ first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
if (!first)
- timer_delete(&ls->ls_timer);
+ timer_delete(&ls->ls_scan_timer);
else
- __rsb_mod_timer(ls, first->res_toss_time);
+ enable_scan_timer(ls, first->res_toss_time);
}
out:
- spin_unlock_bh(&ls->ls_toss_q_lock);
+ spin_unlock_bh(&ls->ls_scan_lock);
}
-/* Caller must held ls_rsbtbl_lock and need to be called every time
- * when either the rsb enters toss state or the toss state changes
- * the dir/master nodeid.
- */
-static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
+static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
{
int our_nodeid = dlm_our_nodeid();
struct dlm_rsb *first;
- /* If we're the directory record for this rsb, and
- * we're not the master of it, then we need to wait
- * for the master node to send us a dir remove for
- * before removing the dir record.
- */
- if (!dlm_no_directory(ls) &&
- (r->res_master_nodeid != our_nodeid) &&
- (dlm_dir_nodeid(r) == our_nodeid)) {
- rsb_delete_toss_timer(ls, r);
- return;
- }
+ /* A dir record for a remote master rsb should never be on the scan list. */
+ WARN_ON(!dlm_no_directory(ls) &&
+ (r->res_master_nodeid != our_nodeid) &&
+ (dlm_dir_nodeid(r) == our_nodeid));
+
+ /* An active rsb should never be on the scan list. */
+ WARN_ON(!rsb_flag(r, RSB_INACTIVE));
- spin_lock_bh(&ls->ls_toss_q_lock);
+ /* An rsb should not already be on the scan list. */
+ WARN_ON(!list_empty(&r->res_scan_list));
+
+ spin_lock_bh(&ls->ls_scan_lock);
/* set the new rsb absolute expire time in the rsb */
r->res_toss_time = rsb_toss_jiffies();
- if (list_empty(&ls->ls_toss_q)) {
+ if (list_empty(&ls->ls_scan_list)) {
/* if the queue is empty add the element and it's
* our new expire time
*/
- list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
- __rsb_mod_timer(ls, r->res_toss_time);
+ list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
+ enable_scan_timer(ls, r->res_toss_time);
} else {
- /* check if the rsb was already queued, if so delete
- * it from the toss queue
- */
- if (!list_empty(&r->res_toss_q_list))
- list_del(&r->res_toss_q_list);
-
/* try to get the maybe new first element and then add
* to this rsb with the oldest expire time to the end
* of the queue. If the list was empty before this
* rsb expire time is our next expiration if it wasn't
* the now new first elemet is our new expiration time
*/
- first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
- res_toss_q_list);
- list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
+ first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
+ list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
if (!first)
- __rsb_mod_timer(ls, r->res_toss_time);
+ enable_scan_timer(ls, r->res_toss_time);
else
- __rsb_mod_timer(ls, first->res_toss_time);
+ enable_scan_timer(ls, first->res_toss_time);
}
- spin_unlock_bh(&ls->ls_toss_q_lock);
+ spin_unlock_bh(&ls->ls_scan_lock);
}
/* if we hit contention we do in 250 ms a retry to trylock.
@@ -547,9 +505,11 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
*/
#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
-void dlm_rsb_toss_timer(struct timer_list *timer)
+/* Called by lockspace scan_timer to free unused rsb's. */
+
+void dlm_rsb_scan(struct timer_list *timer)
{
- struct dlm_ls *ls = from_timer(ls, timer, ls_timer);
+ struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
int our_nodeid = dlm_our_nodeid();
struct dlm_rsb *r;
int rv;
@@ -557,76 +517,63 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
while (1) {
/* interrupting point to leave iteration when
* recovery waits for timer_delete_sync(), recovery
- * will take care to delete everything in toss queue.
+ * will take care to delete everything in scan list.
*/
if (dlm_locking_stopped(ls))
break;
- rv = spin_trylock(&ls->ls_toss_q_lock);
+ rv = spin_trylock(&ls->ls_scan_lock);
if (!rv) {
/* rearm again try timer */
- __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
+ enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
break;
}
- r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
- res_toss_q_list);
+ r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
+ res_scan_list);
if (!r) {
- /* nothing to do anymore next rsb queue will
- * set next mod_timer() expire.
- */
- spin_unlock(&ls->ls_toss_q_lock);
+ /* the next add_scan will enable the timer again */
+ spin_unlock(&ls->ls_scan_lock);
break;
}
- /* test if the first rsb isn't expired yet, if
- * so we stop freeing rsb from toss queue as
- * the order in queue is ascending to the
- * absolute res_toss_time jiffies
+ /*
+ * If the first rsb is not yet expired, then stop because the
+ * list is sorted with nearest expiration first.
*/
if (time_before(jiffies, r->res_toss_time)) {
/* rearm with the next rsb to expire in the future */
- __rsb_mod_timer(ls, r->res_toss_time);
- spin_unlock(&ls->ls_toss_q_lock);
+ enable_scan_timer(ls, r->res_toss_time);
+ spin_unlock(&ls->ls_scan_lock);
break;
}
/* in find_rsb_dir/nodir there is a reverse order of this
* lock, however this is only a trylock if we hit some
* possible contention we try it again.
- *
- * This lock synchronized while holding ls_toss_q_lock
- * synchronize everything that rsb_delete_toss_timer()
- * or rsb_mod_timer() can't run after this timer callback
- * deletes the rsb from the ls_toss_q. Whereas the other
- * holders have always a priority to run as this is only
- * a caching handling and the other holders might to put
- * this rsb out of the toss state.
*/
rv = write_trylock(&ls->ls_rsbtbl_lock);
if (!rv) {
- spin_unlock(&ls->ls_toss_q_lock);
+ spin_unlock(&ls->ls_scan_lock);
/* rearm again try timer */
- __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
+ enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
break;
}
- list_del(&r->res_rsbs_list);
+ list_del(&r->res_slow_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
+ rsb_clear_flag(r, RSB_HASHED);
- /* not necessary to held the ls_rsbtbl_lock when
- * calling send_remove()
- */
+ /* ls_rsbtbl_lock is not needed when calling send_remove() */
write_unlock(&ls->ls_rsbtbl_lock);
- /* remove the rsb out of the toss queue its gone
- * drom DLM now
- */
- list_del_init(&r->res_toss_q_list);
- spin_unlock(&ls->ls_toss_q_lock);
+ list_del_init(&r->res_scan_list);
+ spin_unlock(&ls->ls_scan_lock);
- /* no rsb in this state should ever run a timer */
+ /* An rsb that is a dir record for a remote master rsb
+ * cannot be removed, and should not have a timer enabled.
+ */
WARN_ON(!dlm_no_directory(ls) &&
(r->res_master_nodeid != our_nodeid) &&
(dlm_dir_nodeid(r) == our_nodeid));
@@ -640,7 +587,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer)
(dlm_dir_nodeid(r) != our_nodeid))
send_remove(r);
- free_toss_rsb(r);
+ free_inactive_rsb(r);
}
}
@@ -652,22 +599,10 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
struct dlm_rsb **r_ret)
{
struct dlm_rsb *r;
- int count;
- spin_lock_bh(&ls->ls_new_rsb_spin);
- if (list_empty(&ls->ls_new_rsb)) {
- count = ls->ls_new_rsb_count;
- spin_unlock_bh(&ls->ls_new_rsb_spin);
- log_debug(ls, "find_rsb retry %d %d %s",
- count, dlm_config.ci_new_rsb_count,
- (const char *)name);
- return -EAGAIN;
- }
-
- r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
- list_del(&r->res_hashchain);
- ls->ls_new_rsb_count--;
- spin_unlock_bh(&ls->ls_new_rsb_spin);
+ r = dlm_allocate_rsb(ls);
+ if (!r)
+ return -ENOMEM;
r->res_ls = ls;
r->res_length = len;
@@ -679,7 +614,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
INIT_LIST_HEAD(&r->res_convertqueue);
INIT_LIST_HEAD(&r->res_waitqueue);
INIT_LIST_HEAD(&r->res_root_list);
- INIT_LIST_HEAD(&r->res_toss_q_list);
+ INIT_LIST_HEAD(&r->res_scan_list);
INIT_LIST_HEAD(&r->res_recover_list);
INIT_LIST_HEAD(&r->res_masters_list);
@@ -702,8 +637,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
{
- return rhashtable_insert_fast(rhash, &rsb->res_node,
- dlm_rhash_rsb_params);
+ int rv;
+
+ rv = rhashtable_insert_fast(rhash, &rsb->res_node,
+ dlm_rhash_rsb_params);
+ if (!rv)
+ rsb_set_flag(rsb, RSB_HASHED);
+
+ return rv;
}
/*
@@ -733,7 +674,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
* So, if the given rsb is on the toss list, it is moved to the keep list
* before being returned.
*
- * toss_rsb() happens when all local usage of the rsb is done, i.e. no
+ * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
* more refcounts exist, so the rsb is moved from the keep list to the
* toss list.
*
@@ -781,9 +722,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
*
* If someone sends us a request, we are the dir node, and we do
* not find the rsb anywhere, then recreate it. This happens if
- * someone sends us a request after we have removed/freed an rsb
- * from our toss list. (They sent a request instead of lookup
- * because they are using an rsb from their toss list.)
+ * someone sends us a request after we have removed/freed an rsb.
+ * (They sent a request instead of lookup because they are using
+ * an rsb taken from their scan list.)
*/
if (from_local || from_dir ||
@@ -792,15 +733,8 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
}
retry:
- if (create) {
- error = pre_rsb_struct(ls);
- if (error < 0)
- goto out;
- }
- retry_lookup:
-
- /* check if the rsb is in keep state under read lock - likely path */
+ /* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error) {
@@ -812,9 +746,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
* rsb is active, so we can't check master_nodeid without lock_rsb.
*/
- if (rsb_flag(r, RSB_TOSS)) {
+ if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
- goto do_toss;
+ goto do_inactive;
}
kref_get(&r->res_ref);
@@ -822,17 +756,29 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
goto out;
- do_toss:
+ do_inactive:
write_lock_bh(&ls->ls_rsbtbl_lock);
- /* retry lookup under write lock to see if its still in toss state
- * if not it's in keep state and we relookup - unlikely path.
+ /*
+ * The expectation here is that the rsb will have HASHED and
+ * INACTIVE flags set, and that the rsb can be moved from
+ * inactive back to active again. However, between releasing
+ * the read lock and acquiring the write lock, this rsb could
+ * have been removed from rsbtbl, and had HASHED cleared, to
+ * be freed. To deal with this case, we would normally need
+ * to repeat dlm_search_rsb_tree while holding the write lock,
+ * but rcu allows us to simply check the HASHED flag, because
+ * the rcu read lock means the rsb will not be freed yet.
+ * If the HASHED flag is not set, then the rsb is being freed,
+ * so we add a new rsb struct. If the HASHED flag is set,
+ * and INACTIVE is not set, it means another thread has
+ * made the rsb active, as we're expecting to do here, and
+ * we just repeat the lookup (this will be very unlikely.)
*/
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
- if (!error) {
- if (!rsb_flag(r, RSB_TOSS)) {
+ if (rsb_flag(r, RSB_HASHED)) {
+ if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock);
- goto retry_lookup;
+ goto retry;
}
} else {
write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -842,14 +788,14 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
/*
* rsb found inactive (master_nodeid may be out of date unless
* we are the dir_nodeid or were the master) No other thread
- * is using this rsb because it's on the toss list, so we can
+ * is using this rsb because it's inactive, so we can
* look at or update res_master_nodeid without lock_rsb.
*/
if ((r->res_master_nodeid != our_nodeid) && from_other) {
/* our rsb was not master, and another node (not the dir node)
has sent us a request */
- log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
+ log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
from_nodeid, r->res_master_nodeid, dir_nodeid,
r->res_name);
write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -859,7 +805,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
if ((r->res_master_nodeid != our_nodeid) && from_dir) {
/* don't think this should ever happen */
- log_error(ls, "find_rsb toss from_dir %d master %d",
+ log_error(ls, "find_rsb inactive from_dir %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
/* fix it and go on */
@@ -876,14 +822,12 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
r->res_first_lkid = 0;
}
- list_move(&r->res_rsbs_list, &ls->ls_keep);
- rsb_clear_flag(r, RSB_TOSS);
- /* rsb got out of toss state, it becomes alive again
- * and we reinit the reference counter that is only
- * valid for keep state rsbs
- */
- kref_init(&r->res_ref);
- rsb_delete_toss_timer(ls, r);
+ /* A dir record will not be on the scan list. */
+ if (r->res_dir_nodeid != our_nodeid)
+ del_scan(ls, r);
+ list_move(&r->res_slow_list, &ls->ls_slow_active);
+ rsb_clear_flag(r, RSB_INACTIVE);
+ kref_init(&r->res_ref); /* ref is now used in active state */
write_unlock_bh(&ls->ls_rsbtbl_lock);
goto out;
@@ -898,9 +842,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
goto out;
error = get_rsb_struct(ls, name, len, &r);
- if (error == -EAGAIN)
- goto retry;
- if (error)
+ if (WARN_ON_ONCE(error))
goto out;
r->res_hash = hash;
@@ -952,9 +894,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
*/
write_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r);
- goto retry_lookup;
+ goto retry;
} else if (!error) {
- list_add(&r->res_rsbs_list, &ls->ls_keep);
+ list_add(&r->res_slow_list, &ls->ls_slow_active);
}
write_unlock_bh(&ls->ls_rsbtbl_lock);
out:
@@ -976,13 +918,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
int error;
retry:
- error = pre_rsb_struct(ls);
- if (error < 0)
- goto out;
- retry_lookup:
-
- /* check if the rsb is in keep state under read lock - likely path */
+ /* check if the rsb is in active state under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error) {
@@ -990,9 +927,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
goto do_new;
}
- if (rsb_flag(r, RSB_TOSS)) {
+ if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
- goto do_toss;
+ goto do_inactive;
}
/*
@@ -1005,17 +942,14 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
goto out;
- do_toss:
+ do_inactive:
write_lock_bh(&ls->ls_rsbtbl_lock);
- /* retry lookup under write lock to see if its still in toss state
- * if not it's in keep state and we relookup - unlikely path.
- */
- error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
- if (!error) {
- if (!rsb_flag(r, RSB_TOSS)) {
+ /* See comment in find_rsb_dir. */
+ if (rsb_flag(r, RSB_HASHED)) {
+ if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock);
- goto retry_lookup;
+ goto retry;
}
} else {
write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1025,14 +959,14 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
/*
* rsb found inactive. No other thread is using this rsb because
- * it's on the toss list, so we can look at or update
- * res_master_nodeid without lock_rsb.
+ * it's inactive, so we can look at or update res_master_nodeid
+ * without lock_rsb.
*/
if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
/* our rsb is not master, and another node has sent us a
request; this should never happen */
- log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
+ log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
from_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1044,21 +978,17 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
(dir_nodeid == our_nodeid)) {
/* our rsb is not master, and we are dir; may as well fix it;
this should never happen */
- log_error(ls, "find_rsb toss our %d master %d dir %d",
+ log_error(ls, "find_rsb inactive our %d master %d dir %d",
our_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
}
- list_move(&r->res_rsbs_list, &ls->ls_keep);
- rsb_clear_flag(r, RSB_TOSS);
- /* rsb got out of toss state, it becomes alive again
- * and we reinit the reference counter that is only
- * valid for keep state rsbs
- */
+ list_move(&r->res_slow_list, &ls->ls_slow_active);
+ rsb_clear_flag(r, RSB_INACTIVE);
kref_init(&r->res_ref);
- rsb_delete_toss_timer(ls, r);
+ del_scan(ls, r);
write_unlock_bh(&ls->ls_rsbtbl_lock);
goto out;
@@ -1070,10 +1000,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
*/
error = get_rsb_struct(ls, name, len, &r);
- if (error == -EAGAIN) {
- goto retry;
- }
- if (error)
+ if (WARN_ON_ONCE(error))
goto out;
r->res_hash = hash;
@@ -1090,9 +1017,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
*/
write_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r);
- goto retry_lookup;
+ goto retry;
} else if (!error) {
- list_add(&r->res_rsbs_list, &ls->ls_keep);
+ list_add(&r->res_slow_list, &ls->ls_slow_active);
}
write_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1101,12 +1028,54 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
return error;
}
+/*
+ * rsb rcu usage
+ *
+ * While rcu read lock is held, the rsb cannot be freed,
+ * which allows a lookup optimization.
+ *
+ * Two threads are accessing the same rsb concurrently,
+ * the first (A) is trying to use the rsb, the second (B)
+ * is trying to free the rsb.
+ *
+ * thread A thread B
+ * (trying to use rsb) (trying to free rsb)
+ *
+ * A1. rcu read lock
+ * A2. rsbtbl read lock
+ * A3. look up rsb in rsbtbl
+ * A4. rsbtbl read unlock
+ * B1. rsbtbl write lock
+ * B2. look up rsb in rsbtbl
+ * B3. remove rsb from rsbtbl
+ * B4. clear rsb HASHED flag
+ * B5. rsbtbl write unlock
+ * B6. begin freeing rsb using rcu...
+ *
+ * (rsb is inactive, so try to make it active again)
+ * A5. read rsb HASHED flag (safe because rsb is not freed yet)
+ * A6. the rsb HASHED flag is not set, which it means the rsb
+ * is being removed from rsbtbl and freed, so don't use it.
+ * A7. rcu read unlock
+ *
+ * B7. ...finish freeing rsb using rcu
+ * A8. create a new rsb
+ *
+ * Without the rcu optimization, steps A5-8 would need to do
+ * an extra rsbtbl lookup:
+ * A5. rsbtbl write lock
+ * A6. look up rsb in rsbtbl, not found
+ * A7. rsbtbl write unlock
+ * A8. create a new rsb
+ */
+
static int find_rsb(struct dlm_ls *ls, const void *name, int len,
int from_nodeid, unsigned int flags,
struct dlm_rsb **r_ret)
{
int dir_nodeid;
uint32_t hash;
+ int rv;
if (len > DLM_RESNAME_MAXLEN)
return -EINVAL;
@@ -1114,12 +1083,15 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
hash = jhash(name, len, 0);
dir_nodeid = dlm_hash2nodeid(ls, hash);
+ rcu_read_lock();
if (dlm_no_directory(ls))
- return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
+ rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
from_nodeid, flags, r_ret);
else
- return find_rsb_dir(ls, name, len, hash, dir_nodeid,
+ rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
from_nodeid, flags, r_ret);
+ rcu_read_unlock();
+ return rv;
}
/* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -1166,7 +1138,7 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
}
static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
- int from_nodeid, bool toss_list, unsigned int flags,
+ int from_nodeid, bool is_inactive, unsigned int flags,
int *r_nodeid, int *result)
{
int fix_master = (flags & DLM_LU_RECOVER_MASTER);
@@ -1190,9 +1162,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
r->res_nodeid = from_nodeid;
rsb_set_flag(r, RSB_NEW_MASTER);
- if (toss_list) {
- /* I don't think we should ever find it on toss list. */
- log_error(ls, "%s fix_master on toss", __func__);
+ if (is_inactive) {
+ /* I don't think we should ever find it inactive. */
+ log_error(ls, "%s fix_master inactive", __func__);
dlm_dump_rsb(r);
}
}
@@ -1232,7 +1204,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
if (!from_master && !fix_master &&
(r->res_master_nodeid == from_nodeid)) {
/* this can happen when the master sends remove, the dir node
- * finds the rsb on the keep list and ignores the remove,
+ * finds the rsb on the active list and ignores the remove,
* and the former master sends a lookup
*/
@@ -1276,8 +1248,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
* . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
*/
-int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
- int len, unsigned int flags, int *r_nodeid, int *result)
+static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+ int len, unsigned int flags, int *r_nodeid, int *result)
{
struct dlm_rsb *r = NULL;
uint32_t hash;
@@ -1304,19 +1276,14 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
}
retry:
- error = pre_rsb_struct(ls);
- if (error < 0)
- return error;
-
- retry_lookup:
- /* check if the rsb is in keep state under read lock - likely path */
+ /* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) {
- if (rsb_flag(r, RSB_TOSS)) {
+ if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
- goto do_toss;
+ goto do_inactive;
}
/* because the rsb is active, we need to lock_rsb before
@@ -1340,53 +1307,48 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
goto not_found;
}
- do_toss:
+ do_inactive:
/* unlikely path - relookup under write */
write_lock_bh(&ls->ls_rsbtbl_lock);
- /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
- * check if the rsb is still in toss state, if not relookup
- */
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) {
- if (!rsb_flag(r, RSB_TOSS)) {
+ if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock);
/* something as changed, very unlikely but
* try again
*/
- goto retry_lookup;
+ goto retry;
}
} else {
write_unlock_bh(&ls->ls_rsbtbl_lock);
goto not_found;
}
- /* because the rsb is inactive (on toss list), it's not refcounted
- * and lock_rsb is not used, but is protected by the rsbtbl lock
- */
+ /* because the rsb is inactive, it's not refcounted and lock_rsb
+ is not used, but is protected by the rsbtbl lock */
__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
r_nodeid, result);
- rsb_mod_timer(ls, r);
- /* the rsb was inactive (on toss list) */
+ /* A dir record rsb should never be on scan list. */
+ /* Try to fix this with del_scan? */
+ WARN_ON(!list_empty(&r->res_scan_list));
+
write_unlock_bh(&ls->ls_rsbtbl_lock);
return 0;
not_found:
error = get_rsb_struct(ls, name, len, &r);
- if (error == -EAGAIN)
- goto retry;
- if (error)
+ if (WARN_ON_ONCE(error))
goto out;
r->res_hash = hash;
r->res_dir_nodeid = our_nodeid;
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
- kref_init(&r->res_ref);
- rsb_set_flag(r, RSB_TOSS);
+ rsb_set_flag(r, RSB_INACTIVE);
write_lock_bh(&ls->ls_rsbtbl_lock);
error = rsb_insert(r, &ls->ls_rsbtbl);
@@ -1396,7 +1358,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
*/
write_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r);
- goto retry_lookup;
+ goto retry;
} else if (error) {
write_unlock_bh(&ls->ls_rsbtbl_lock);
/* should never happen */
@@ -1404,8 +1366,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
goto retry;
}
- list_add(&r->res_rsbs_list, &ls->ls_toss);
- rsb_mod_timer(ls, r);
+ list_add(&r->res_slow_list, &ls->ls_slow_inactive);
write_unlock_bh(&ls->ls_rsbtbl_lock);
if (result)
@@ -1415,12 +1376,22 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
return error;
}
+int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
+ int len, unsigned int flags, int *r_nodeid, int *result)
+{
+ int rv;
+ rcu_read_lock();
+ rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
+ rcu_read_unlock();
+ return rv;
+}
+
static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
{
struct dlm_rsb *r;
read_lock_bh(&ls->ls_rsbtbl_lock);
- list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
+ list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
if (r->res_hash == hash)
dlm_dump_rsb(r);
}
@@ -1442,15 +1413,28 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
read_unlock_bh(&ls->ls_rsbtbl_lock);
}
-static void toss_rsb(struct kref *kref)
+static void deactivate_rsb(struct kref *kref)
{
struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
struct dlm_ls *ls = r->res_ls;
+ int our_nodeid = dlm_our_nodeid();
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
- rsb_set_flag(r, RSB_TOSS);
- list_move(&r->res_rsbs_list, &ls->ls_toss);
- rsb_mod_timer(ls, r);
+ rsb_set_flag(r, RSB_INACTIVE);
+ list_move(&r->res_slow_list, &ls->ls_slow_inactive);
+
+ /*
+ * When the rsb becomes unused:
+ * - If it's not a dir record for a remote master rsb,
+ * then it is put on the scan list to be freed.
+ * - If it's a dir record for a remote master rsb,
+ * then it is kept in the inactive state until
+ * receive_remove() from the master node.
+ */
+ if (!dlm_no_directory(ls) &&
+ (r->res_master_nodeid != our_nodeid) &&
+ (dlm_dir_nodeid(r) != our_nodeid))
+ add_scan(ls, r);
if (r->res_lvbptr) {
dlm_free_lvb(r->res_lvbptr);
@@ -1464,22 +1448,22 @@ static void unhold_rsb(struct dlm_rsb *r)
{
int rv;
- /* rsbs in toss state never get referenced */
- WARN_ON(rsb_flag(r, RSB_TOSS));
- rv = kref_put(&r->res_ref, toss_rsb);
+ /* inactive rsbs are not ref counted */
+ WARN_ON(rsb_flag(r, RSB_INACTIVE));
+ rv = kref_put(&r->res_ref, deactivate_rsb);
DLM_ASSERT(!rv, dlm_dump_rsb(r););
}
-void free_toss_rsb(struct dlm_rsb *r)
+void free_inactive_rsb(struct dlm_rsb *r)
{
- WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
+ WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
- DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
@@ -1504,11 +1488,15 @@ static void detach_lkb(struct dlm_lkb *lkb)
}
static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
- int start, int end)
+ unsigned long start, unsigned long end)
{
+ struct xa_limit limit;
struct dlm_lkb *lkb;
int rv;
+ limit.max = end;
+ limit.min = start;
+
lkb = dlm_allocate_lkb(ls);
if (!lkb)
return -ENOMEM;
@@ -1522,14 +1510,12 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
INIT_LIST_HEAD(&lkb->lkb_ownqueue);
INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
- write_lock_bh(&ls->ls_lkbidr_lock);
- rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
- if (rv >= 0)
- lkb->lkb_id = rv;
- write_unlock_bh(&ls->ls_lkbidr_lock);
+ write_lock_bh(&ls->ls_lkbxa_lock);
+ rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
+ write_unlock_bh(&ls->ls_lkbxa_lock);
if (rv < 0) {
- log_error(ls, "create_lkb idr error %d", rv);
+ log_error(ls, "create_lkb xa error %d", rv);
dlm_free_lkb(lkb);
return rv;
}
@@ -1540,18 +1526,18 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
{
- return _create_lkb(ls, lkb_ret, 1, 0);
+ return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
}
static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
{
struct dlm_lkb *lkb;
- read_lock_bh(&ls->ls_lkbidr_lock);
- lkb = idr_find(&ls->ls_lkbidr, lkid);
+ read_lock_bh(&ls->ls_lkbxa_lock);
+ lkb = xa_load(&ls->ls_lkbxa, lkid);
if (lkb)
kref_get(&lkb->lkb_ref);
- read_unlock_bh(&ls->ls_lkbidr_lock);
+ read_unlock_bh(&ls->ls_lkbxa_lock);
*lkb_ret = lkb;
return lkb ? 0 : -ENOENT;
@@ -1576,10 +1562,10 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
int rv;
rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
- &ls->ls_lkbidr_lock);
+ &ls->ls_lkbxa_lock);
if (rv) {
- idr_remove(&ls->ls_lkbidr, lkid);
- write_unlock_bh(&ls->ls_lkbidr_lock);
+ xa_erase(&ls->ls_lkbxa, lkid);
+ write_unlock_bh(&ls->ls_lkbxa_lock);
detach_lkb(lkb);
@@ -4323,8 +4309,9 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
return;
}
- /* Look for name in rsb toss state, if it's there, kill it.
- * If it's in non toss state, it's being used, and we should ignore this
+ /*
+ * Look for inactive rsb, if it's there, free it.
+ * If the rsb is active, it's being used, and we should ignore this
* message. This is an expected race between the dir node sending a
* request to the master node at the same time as the master node sends
* a remove to the dir node. The resolution to that race is for the
@@ -4347,16 +4334,18 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
return;
}
- if (!rsb_flag(r, RSB_TOSS)) {
+ if (!rsb_flag(r, RSB_INACTIVE)) {
if (r->res_master_nodeid != from_nodeid) {
/* should not happen */
- log_error(ls, "receive_remove keep from %d master %d",
+ log_error(ls, "receive_remove on active rsb from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
+ /* Ignore the remove message, see race comment above. */
+
log_debug(ls, "receive_remove from %d master %d first %x %s",
from_nodeid, r->res_master_nodeid, r->res_first_lkid,
name);
@@ -4365,19 +4354,20 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
}
if (r->res_master_nodeid != from_nodeid) {
- log_error(ls, "receive_remove toss from %d master %d",
+ log_error(ls, "receive_remove inactive from %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r);
write_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
- list_del(&r->res_rsbs_list);
+ list_del(&r->res_slow_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
+ rsb_clear_flag(r, RSB_HASHED);
write_unlock_bh(&ls->ls_rsbtbl_lock);
- free_toss_rsb(r);
+ free_inactive_rsb(r);
}
static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
@@ -5444,7 +5434,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
struct dlm_rsb *r;
read_lock_bh(&ls->ls_rsbtbl_lock);
- list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
+ list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
if (!rsb_flag(r, RSB_RECOVER_GRANT))
continue;
if (!is_master(r)) {