diff options
Diffstat (limited to 'fs/gfs2/glock.c')
| -rw-r--r-- | fs/gfs2/glock.c | 255 |
1 files changed, 146 insertions, 109 deletions
diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index 94f50cac91c6..c38ab6c81898 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -73,16 +73,16 @@ static DEFINE_SPINLOCK(lru_lock); static struct rhashtable_params ht_parms = { .nelem_hint = GFS2_GL_HASH_SIZE * 3 / 4, - .key_len = sizeof(struct lm_lockname), + .key_len = offsetofend(struct lm_lockname, ln_type), .key_offset = offsetof(struct gfs2_glock, gl_name), .head_offset = offsetof(struct gfs2_glock, gl_node), }; static struct rhashtable gl_hash_table; -void gfs2_glock_free(struct gfs2_glock *gl) +static void gfs2_glock_dealloc(struct rcu_head *rcu) { - struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + struct gfs2_glock *gl = container_of(rcu, struct gfs2_glock, gl_rcu); if (gl->gl_ops->go_flags & GLOF_ASPACE) { kmem_cache_free(gfs2_glock_aspace_cachep, gl); @@ -90,6 +90,13 @@ void gfs2_glock_free(struct gfs2_glock *gl) kfree(gl->gl_lksb.sb_lvbptr); kmem_cache_free(gfs2_glock_cachep, gl); } +} + +void gfs2_glock_free(struct gfs2_glock *gl) +{ + struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; + + call_rcu(&gl->gl_rcu, gfs2_glock_dealloc); if (atomic_dec_and_test(&sdp->sd_glock_disposal)) wake_up(&sdp->sd_glock_wait); } @@ -152,20 +159,34 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl) spin_unlock(&lru_lock); } -/** - * gfs2_glock_put() - Decrement reference count on glock - * @gl: The glock to put - * +/* + * Enqueue the glock on the work queue. Passes one glock reference on to the + * work queue. */ +static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { + if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) { + /* + * We are holding the lockref spinlock, and the work was still + * queued above. The queued work (glock_work_func) takes that + * spinlock before dropping its glock reference(s), so it + * cannot have dropped them in the meantime. + */ + GLOCK_BUG_ON(gl, gl->gl_lockref.count < 2); + gl->gl_lockref.count--; + } +} -void gfs2_glock_put(struct gfs2_glock *gl) +static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { + spin_lock(&gl->gl_lockref.lock); + __gfs2_glock_queue_work(gl, delay); + spin_unlock(&gl->gl_lockref.lock); +} + +static void __gfs2_glock_put(struct gfs2_glock *gl) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; struct address_space *mapping = gfs2_glock2aspace(gl); - if (lockref_put_or_lock(&gl->gl_lockref)) - return; - lockref_mark_dead(&gl->gl_lockref); gfs2_glock_remove_from_lru(gl); @@ -178,6 +199,20 @@ void gfs2_glock_put(struct gfs2_glock *gl) } /** + * gfs2_glock_put() - Decrement reference count on glock + * @gl: The glock to put + * + */ + +void gfs2_glock_put(struct gfs2_glock *gl) +{ + if (lockref_put_or_lock(&gl->gl_lockref)) + return; + + __gfs2_glock_put(gl); +} + +/** * may_grant - check if its ok to grant a new lock * @gl: The glock * @gh: The lock request which we wish to grant @@ -449,6 +484,9 @@ __acquires(&gl->gl_lockref.lock) unsigned int lck_flags = (unsigned int)(gh ? gh->gh_flags : 0); int ret; + if (unlikely(test_bit(SDF_SHUTDOWN, &sdp->sd_flags)) && + target != LM_ST_UNLOCKED) + return; lck_flags &= (LM_FLAG_TRY | LM_FLAG_TRY_1CB | LM_FLAG_NOEXP | LM_FLAG_PRIORITY); GLOCK_BUG_ON(gl, gl->gl_state == target); @@ -479,17 +517,16 @@ __acquires(&gl->gl_lockref.lock) target == LM_ST_UNLOCKED && test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) { finish_xmote(gl, target); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gfs2_glock_put(gl); + gfs2_glock_queue_work(gl, 0); } else if (ret) { pr_err("lm_lock ret %d\n", ret); - GLOCK_BUG_ON(gl, 1); + GLOCK_BUG_ON(gl, !test_bit(SDF_SHUTDOWN, + &sdp->sd_flags)); } } else { /* lock_nolock */ finish_xmote(gl, target); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gfs2_glock_put(gl); + gfs2_glock_queue_work(gl, 0); } spin_lock(&gl->gl_lockref.lock); @@ -561,8 +598,7 @@ out_sched: clear_bit(GLF_LOCK, &gl->gl_flags); smp_mb__after_atomic(); gl->gl_lockref.count++; - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gl->gl_lockref.count--; + __gfs2_glock_queue_work(gl, 0); return; out_unlock: @@ -597,11 +633,11 @@ static void glock_work_func(struct work_struct *work) { unsigned long delay = 0; struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work); - int drop_ref = 0; + unsigned int drop_refs = 1; if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) { finish_xmote(gl, gl->gl_reply); - drop_ref = 1; + drop_refs++; } spin_lock(&gl->gl_lockref.lock); if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && @@ -619,17 +655,25 @@ static void glock_work_func(struct work_struct *work) } } run_queue(gl, 0); - spin_unlock(&gl->gl_lockref.lock); - if (!delay) - gfs2_glock_put(gl); - else { + if (delay) { + /* Keep one glock reference for the work we requeue. */ + drop_refs--; if (gl->gl_name.ln_type != LM_TYPE_INODE) delay = 0; - if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0) - gfs2_glock_put(gl); + __gfs2_glock_queue_work(gl, delay); } - if (drop_ref) - gfs2_glock_put(gl); + + /* + * Drop the remaining glock references manually here. (Mind that + * __gfs2_glock_queue_work depends on the lockref spinlock begin held + * here as well.) + */ + gl->gl_lockref.count -= drop_refs; + if (!gl->gl_lockref.count) { + __gfs2_glock_put(gl); + return; + } + spin_unlock(&gl->gl_lockref.lock); } /** @@ -653,14 +697,16 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, struct lm_lockname name = { .ln_number = number, .ln_type = glops->go_type, .ln_sbd = sdp }; - struct gfs2_glock *gl, *tmp = NULL; + struct gfs2_glock *gl, *tmp; struct address_space *mapping; struct kmem_cache *cachep; - int ret, tries = 0; + int ret = 0; + rcu_read_lock(); gl = rhashtable_lookup_fast(&gl_hash_table, &name, ht_parms); if (gl && !lockref_get_not_dead(&gl->gl_lockref)) gl = NULL; + rcu_read_unlock(); *glp = gl; if (gl) @@ -719,32 +765,32 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number, } again: - ret = rhashtable_lookup_insert_fast(&gl_hash_table, &gl->gl_node, - ht_parms); - if (ret == 0) { + rcu_read_lock(); + tmp = rhashtable_lookup_get_insert_fast(&gl_hash_table, &gl->gl_node, + ht_parms); + if (!tmp) { *glp = gl; - return 0; + goto out; } - - if (ret == -EEXIST) { - ret = 0; - tmp = rhashtable_lookup_fast(&gl_hash_table, &name, ht_parms); - if (tmp == NULL || !lockref_get_not_dead(&tmp->gl_lockref)) { - if (++tries < 100) { - cond_resched(); - goto again; - } - tmp = NULL; - ret = -ENOMEM; - } - } else { - WARN_ON_ONCE(ret); + if (IS_ERR(tmp)) { + ret = PTR_ERR(tmp); + goto out_free; } + if (lockref_get_not_dead(&tmp->gl_lockref)) { + *glp = tmp; + goto out_free; + } + rcu_read_unlock(); + cond_resched(); + goto again; + +out_free: kfree(gl->gl_lksb.sb_lvbptr); kmem_cache_free(cachep, gl); atomic_dec(&sdp->sd_glock_disposal); - *glp = tmp; +out: + rcu_read_unlock(); return ret; } @@ -980,8 +1026,7 @@ int gfs2_glock_nq(struct gfs2_holder *gh) test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) { set_bit(GLF_REPLY_PENDING, &gl->gl_flags); gl->gl_lockref.count++; - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gl->gl_lockref.count--; + __gfs2_glock_queue_work(gl, 0); } run_queue(gl, 1); spin_unlock(&gl->gl_lockref.lock); @@ -1041,17 +1086,15 @@ void gfs2_glock_dq(struct gfs2_holder *gh) gfs2_glock_add_to_lru(gl); trace_gfs2_glock_queue(gh, 0); + if (unlikely(!fast_path)) { + gl->gl_lockref.count++; + if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && + !test_bit(GLF_DEMOTE, &gl->gl_flags) && + gl->gl_name.ln_type == LM_TYPE_INODE) + delay = gl->gl_hold_time; + __gfs2_glock_queue_work(gl, delay); + } spin_unlock(&gl->gl_lockref.lock); - if (likely(fast_path)) - return; - - gfs2_glock_hold(gl); - if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && - !test_bit(GLF_DEMOTE, &gl->gl_flags) && - gl->gl_name.ln_type == LM_TYPE_INODE) - delay = gl->gl_hold_time; - if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0) - gfs2_glock_put(gl); } void gfs2_glock_dq_wait(struct gfs2_holder *gh) @@ -1227,9 +1270,8 @@ void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state) spin_lock(&gl->gl_lockref.lock); handle_callback(gl, state, delay, true); + __gfs2_glock_queue_work(gl, delay); spin_unlock(&gl->gl_lockref.lock); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0) - gfs2_glock_put(gl); } /** @@ -1288,10 +1330,8 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret) gl->gl_lockref.count++; set_bit(GLF_REPLY_PENDING, &gl->gl_flags); + __gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); - - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gfs2_glock_put(gl); } static int glock_cmp(void *priv, struct list_head *a, struct list_head *b) @@ -1349,8 +1389,7 @@ add_back_to_lru: if (demote_ok(gl)) handle_callback(gl, LM_ST_UNLOCKED, 0, false); WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags)); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gl->gl_lockref.count--; + __gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); cond_resched_lock(&lru_lock); } @@ -1420,26 +1459,32 @@ static struct shrinker glock_shrinker = { * @sdp: the filesystem * @bucket: the bucket * + * Note that the function can be called multiple times on the same + * object. So the user must ensure that the function can cope with + * that. */ static void glock_hash_walk(glock_examiner examiner, const struct gfs2_sbd *sdp) { struct gfs2_glock *gl; - struct rhash_head *pos; - const struct bucket_table *tbl; - int i; + struct rhashtable_iter iter; - rcu_read_lock(); - tbl = rht_dereference_rcu(gl_hash_table.tbl, &gl_hash_table); - for (i = 0; i < tbl->size; i++) { - rht_for_each_entry_rcu(gl, pos, tbl, i, gl_node) { + rhashtable_walk_enter(&gl_hash_table, &iter); + + do { + gl = ERR_PTR(rhashtable_walk_start(&iter)); + if (gl) + continue; + + while ((gl = rhashtable_walk_next(&iter)) && !IS_ERR(gl)) if ((gl->gl_name.ln_sbd == sdp) && lockref_get_not_dead(&gl->gl_lockref)) examiner(gl); - } - } - rcu_read_unlock(); - cond_resched(); + + rhashtable_walk_stop(&iter); + } while (cond_resched(), gl == ERR_PTR(-EAGAIN)); + + rhashtable_walk_exit(&iter); } /** @@ -1450,13 +1495,12 @@ static void glock_hash_walk(glock_examiner examiner, const struct gfs2_sbd *sdp) static void thaw_glock(struct gfs2_glock *gl) { - if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) - goto out; - set_bit(GLF_REPLY_PENDING, &gl->gl_flags); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) { -out: + if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) { gfs2_glock_put(gl); + return; } + set_bit(GLF_REPLY_PENDING, &gl->gl_flags); + gfs2_glock_queue_work(gl, 0); } /** @@ -1472,9 +1516,8 @@ static void clear_glock(struct gfs2_glock *gl) spin_lock(&gl->gl_lockref.lock); if (gl->gl_state != LM_ST_UNLOCKED) handle_callback(gl, LM_ST_UNLOCKED, 0, false); + __gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gfs2_glock_put(gl); } /** @@ -1802,16 +1845,18 @@ void gfs2_glock_exit(void) static void gfs2_glock_iter_next(struct gfs2_glock_iter *gi) { - do { - gi->gl = rhashtable_walk_next(&gi->hti); + while ((gi->gl = rhashtable_walk_next(&gi->hti))) { if (IS_ERR(gi->gl)) { if (PTR_ERR(gi->gl) == -EAGAIN) continue; gi->gl = NULL; + return; } - /* Skip entries for other sb and dead entries */ - } while ((gi->gl) && ((gi->sdp != gi->gl->gl_name.ln_sbd) || - __lockref_is_dead(&gi->gl->gl_lockref))); + /* Skip entries for other sb and dead entries */ + if (gi->sdp == gi->gl->gl_name.ln_sbd && + !__lockref_is_dead(&gi->gl->gl_lockref)) + return; + } } static void *gfs2_glock_seq_start(struct seq_file *seq, loff_t *pos) @@ -1905,10 +1950,10 @@ static const struct seq_operations gfs2_sbstats_seq_ops = { #define GFS2_SEQ_GOODSIZE min(PAGE_SIZE << PAGE_ALLOC_COSTLY_ORDER, 65536UL) -static int gfs2_glocks_open(struct inode *inode, struct file *file) +static int __gfs2_glocks_open(struct inode *inode, struct file *file, + const struct seq_operations *ops) { - int ret = seq_open_private(file, &gfs2_glock_seq_ops, - sizeof(struct gfs2_glock_iter)); + int ret = seq_open_private(file, ops, sizeof(struct gfs2_glock_iter)); if (ret == 0) { struct seq_file *seq = file->private_data; struct gfs2_glock_iter *gi = seq->private; @@ -1919,11 +1964,16 @@ static int gfs2_glocks_open(struct inode *inode, struct file *file) if (seq->buf) seq->size = GFS2_SEQ_GOODSIZE; gi->gl = NULL; - ret = rhashtable_walk_init(&gl_hash_table, &gi->hti, GFP_KERNEL); + rhashtable_walk_enter(&gl_hash_table, &gi->hti); } return ret; } +static int gfs2_glocks_open(struct inode *inode, struct file *file) +{ + return __gfs2_glocks_open(inode, file, &gfs2_glock_seq_ops); +} + static int gfs2_glocks_release(struct inode *inode, struct file *file) { struct seq_file *seq = file->private_data; @@ -1936,20 +1986,7 @@ static int gfs2_glocks_release(struct inode *inode, struct file *file) static int gfs2_glstats_open(struct inode *inode, struct file *file) { - int ret = seq_open_private(file, &gfs2_glstats_seq_ops, - sizeof(struct gfs2_glock_iter)); - if (ret == 0) { - struct seq_file *seq = file->private_data; - struct gfs2_glock_iter *gi = seq->private; - gi->sdp = inode->i_private; - gi->last_pos = 0; - seq->buf = kmalloc(GFS2_SEQ_GOODSIZE, GFP_KERNEL | __GFP_NOWARN); - if (seq->buf) - seq->size = GFS2_SEQ_GOODSIZE; - gi->gl = NULL; - ret = rhashtable_walk_init(&gl_hash_table, &gi->hti, GFP_KERNEL); - } - return ret; + return __gfs2_glocks_open(inode, file, &gfs2_glstats_seq_ops); } static int gfs2_sbstats_open(struct inode *inode, struct file *file) |