aboutsummaryrefslogtreecommitdiff
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c256
1 files changed, 167 insertions, 89 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c9d2e553a6c4..666a9f274832 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -189,6 +189,7 @@ static int parse_reply_info_dir(void **p, void *end,
info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
+ info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
}
if (num == 0)
goto done;
@@ -378,9 +379,9 @@ const char *ceph_session_state_name(int s)
static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
{
- if (atomic_inc_not_zero(&s->s_ref)) {
+ if (refcount_inc_not_zero(&s->s_ref)) {
dout("mdsc get_session %p %d -> %d\n", s,
- atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
+ refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
return s;
} else {
dout("mdsc get_session %p 0 -- FAIL", s);
@@ -391,8 +392,8 @@ static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
void ceph_put_mds_session(struct ceph_mds_session *s)
{
dout("mdsc put_session %p %d -> %d\n", s,
- atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
- if (atomic_dec_and_test(&s->s_ref)) {
+ refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
+ if (refcount_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer)
ceph_auth_destroy_authorizer(s->s_auth.authorizer);
kfree(s);
@@ -411,7 +412,7 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
return NULL;
session = mdsc->sessions[mds];
dout("lookup_mds_session %p %d\n", session,
- atomic_read(&session->s_ref));
+ refcount_read(&session->s_ref));
get_session(session);
return session;
}
@@ -441,7 +442,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
{
struct ceph_mds_session *s;
- if (mds >= mdsc->mdsmap->m_max_mds)
+ if (mds >= mdsc->mdsmap->m_num_mds)
return ERR_PTR(-EINVAL);
s = kzalloc(sizeof(*s), GFP_NOFS);
@@ -466,7 +467,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_caps);
s->s_nr_caps = 0;
s->s_trim_caps = 0;
- atomic_set(&s->s_ref, 1);
+ refcount_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
s->s_num_cap_releases = 0;
@@ -494,7 +495,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
}
mdsc->sessions[mds] = s;
atomic_inc(&mdsc->num_sessions);
- atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
+ refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
@@ -547,8 +548,8 @@ void ceph_mdsc_release_request(struct kref *kref)
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
iput(req->r_inode);
}
- if (req->r_locked_dir)
- ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
+ if (req->r_parent)
+ ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
@@ -628,6 +629,9 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
{
dout("__unregister_request %p tid %lld\n", req, req->r_tid);
+ /* Never leave an unregistered request on an unsafe list! */
+ list_del_init(&req->r_unsafe_item);
+
if (req->r_tid == mdsc->oldest_tid) {
struct rb_node *p = rb_next(&req->r_node);
mdsc->oldest_tid = 0;
@@ -644,13 +648,15 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
erase_request(&mdsc->request_tree, req);
- if (req->r_unsafe_dir && req->r_got_unsafe) {
+ if (req->r_unsafe_dir &&
+ test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_dir_item);
spin_unlock(&ci->i_unsafe_lock);
}
- if (req->r_target_inode && req->r_got_unsafe) {
+ if (req->r_target_inode &&
+ test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_target_item);
@@ -668,6 +674,28 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
}
/*
+ * Walk back up the dentry tree until we hit a dentry representing a
+ * non-snapshot inode. We do this using the rcu_read_lock (which must be held
+ * when calling this) to ensure that the objects won't disappear while we're
+ * working with them. Once we hit a candidate dentry, we attempt to take a
+ * reference to it, and return that as the result.
+ */
+static struct inode *get_nonsnap_parent(struct dentry *dentry)
+{
+ struct inode *inode = NULL;
+
+ while (dentry && !IS_ROOT(dentry)) {
+ inode = d_inode_rcu(dentry);
+ if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
+ break;
+ dentry = dentry->d_parent;
+ }
+ if (inode)
+ inode = igrab(inode);
+ return inode;
+}
+
+/*
* Choose mds to send request to next. If there is a hint set in the
* request (e.g., due to a prior forward hint from the mds), use that.
* Otherwise, consult frag tree and/or caps to identify the
@@ -675,19 +703,6 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
*
* Called under mdsc->mutex.
*/
-static struct dentry *get_nonsnap_parent(struct dentry *dentry)
-{
- /*
- * we don't need to worry about protecting the d_parent access
- * here because we never renaming inside the snapped namespace
- * except to resplice to another snapdir, and either the old or new
- * result is a valid result.
- */
- while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
- dentry = dentry->d_parent;
- return dentry;
-}
-
static int __choose_mds(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req)
{
@@ -697,7 +712,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
int mode = req->r_direct_mode;
int mds = -1;
u32 hash = req->r_direct_hash;
- bool is_hash = req->r_direct_is_hash;
+ bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
/*
* is there a specific mds we should try? ignore hint if we have
@@ -717,30 +732,39 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
inode = NULL;
if (req->r_inode) {
inode = req->r_inode;
+ ihold(inode);
} else if (req->r_dentry) {
/* ignore race with rename; old or new d_parent is okay */
- struct dentry *parent = req->r_dentry->d_parent;
- struct inode *dir = d_inode(parent);
+ struct dentry *parent;
+ struct inode *dir;
+
+ rcu_read_lock();
+ parent = req->r_dentry->d_parent;
+ dir = req->r_parent ? : d_inode_rcu(parent);
- if (dir->i_sb != mdsc->fsc->sb) {
- /* not this fs! */
+ if (!dir || dir->i_sb != mdsc->fsc->sb) {
+ /* not this fs or parent went negative */
inode = d_inode(req->r_dentry);
+ if (inode)
+ ihold(inode);
} else if (ceph_snap(dir) != CEPH_NOSNAP) {
/* direct snapped/virtual snapdir requests
* based on parent dir inode */
- struct dentry *dn = get_nonsnap_parent(parent);
- inode = d_inode(dn);
+ inode = get_nonsnap_parent(parent);
dout("__choose_mds using nonsnap parent %p\n", inode);
} else {
/* dentry target */
inode = d_inode(req->r_dentry);
if (!inode || mode == USE_AUTH_MDS) {
/* dir + name */
- inode = dir;
+ inode = igrab(dir);
hash = ceph_dentry_hash(dir, req->r_dentry);
is_hash = true;
+ } else {
+ ihold(inode);
}
}
+ rcu_read_unlock();
}
dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
@@ -769,7 +793,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
(int)r, frag.ndist);
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
CEPH_MDS_STATE_ACTIVE)
- return mds;
+ goto out;
}
/* since this file/dir wasn't known to be
@@ -784,7 +808,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
inode, ceph_vinop(inode), frag.frag, mds);
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
CEPH_MDS_STATE_ACTIVE)
- return mds;
+ goto out;
}
}
}
@@ -797,6 +821,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
if (!cap) {
spin_unlock(&ci->i_ceph_lock);
+ iput(inode);
goto random;
}
mds = cap->session->s_mds;
@@ -804,6 +829,8 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
inode, ceph_vinop(inode), mds,
cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock);
+out:
+ iput(inode);
return mds;
random:
@@ -978,7 +1005,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *ts;
int i, mds = session->s_mds;
- if (mds >= mdsc->mdsmap->m_max_mds)
+ if (mds >= mdsc->mdsmap->m_num_mds)
return;
mi = &mdsc->mdsmap->m_info[mds];
@@ -1036,7 +1063,6 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
while (!list_empty(&session->s_unsafe)) {
req = list_first_entry(&session->s_unsafe,
struct ceph_mds_request, r_unsafe_item);
- list_del_init(&req->r_unsafe_item);
pr_warn_ratelimited(" dropping unsafe request %llu\n",
req->r_tid);
__unregister_request(mdsc, req);
@@ -1146,7 +1172,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
if (ci->i_wrbuffer_ref > 0 &&
- ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
+ READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
invalidate = true;
while (!list_empty(&ci->i_cap_flush_list)) {
@@ -1526,9 +1552,15 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_msg *msg = NULL;
struct ceph_mds_cap_release *head;
struct ceph_mds_cap_item *item;
+ struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
struct ceph_cap *cap;
LIST_HEAD(tmp_list);
int num_cap_releases;
+ __le32 barrier, *cap_barrier;
+
+ down_read(&osdc->lock);
+ barrier = cpu_to_le32(osdc->epoch_barrier);
+ up_read(&osdc->lock);
spin_lock(&session->s_cap_lock);
again:
@@ -1546,7 +1578,11 @@ again:
head = msg->front.iov_base;
head->num = cpu_to_le32(0);
msg->front.iov_len = sizeof(*head);
+
+ msg->hdr.version = cpu_to_le16(2);
+ msg->hdr.compat_version = cpu_to_le16(1);
}
+
cap = list_first_entry(&tmp_list, struct ceph_cap,
session_caps);
list_del(&cap->session_caps);
@@ -1564,6 +1600,11 @@ again:
ceph_put_cap(mdsc, cap);
if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+ // Append cap_barrier field
+ cap_barrier = msg->front.iov_base + msg->front.iov_len;
+ *cap_barrier = barrier;
+ msg->front.iov_len += sizeof(*cap_barrier);
+
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
ceph_con_send(&session->s_con, msg);
@@ -1579,6 +1620,11 @@ again:
spin_unlock(&session->s_cap_lock);
if (msg) {
+ // Append cap_barrier field
+ cap_barrier = msg->front.iov_base + msg->front.iov_len;
+ *cap_barrier = barrier;
+ msg->front.iov_len += sizeof(*cap_barrier);
+
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
ceph_con_send(&session->s_con, msg);
@@ -1659,7 +1705,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
- req->r_stamp = current_fs_time(mdsc->fsc->sb);
+ req->r_stamp = timespec_trunc(current_kernel_time(), mdsc->fsc->sb->s_time_gran);
req->r_op = op;
req->r_direct_mode = mode;
@@ -1775,18 +1821,23 @@ retry:
return path;
}
-static int build_dentry_path(struct dentry *dentry,
+static int build_dentry_path(struct dentry *dentry, struct inode *dir,
const char **ppath, int *ppathlen, u64 *pino,
int *pfreepath)
{
char *path;
- if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) {
- *pino = ceph_ino(d_inode(dentry->d_parent));
+ rcu_read_lock();
+ if (!dir)
+ dir = d_inode_rcu(dentry->d_parent);
+ if (dir && ceph_snap(dir) == CEPH_NOSNAP) {
+ *pino = ceph_ino(dir);
+ rcu_read_unlock();
*ppath = dentry->d_name.name;
*ppathlen = dentry->d_name.len;
return 0;
}
+ rcu_read_unlock();
path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
if (IS_ERR(path))
return PTR_ERR(path);
@@ -1822,8 +1873,8 @@ static int build_inode_path(struct inode *inode,
* an explicit ino+path.
*/
static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
- const char *rpath, u64 rino,
- const char **ppath, int *pathlen,
+ struct inode *rdiri, const char *rpath,
+ u64 rino, const char **ppath, int *pathlen,
u64 *ino, int *freepath)
{
int r = 0;
@@ -1833,7 +1884,8 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
ceph_snap(rinode));
} else if (rdentry) {
- r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
+ r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
+ freepath);
dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
*ppath);
} else if (rpath || rino) {
@@ -1866,7 +1918,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
int ret;
ret = set_request_path_attr(req->r_inode, req->r_dentry,
- req->r_path1, req->r_ino1.ino,
+ req->r_parent, req->r_path1, req->r_ino1.ino,
&path1, &pathlen1, &ino1, &freepath1);
if (ret < 0) {
msg = ERR_PTR(ret);
@@ -1874,6 +1926,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
}
ret = set_request_path_attr(NULL, req->r_old_dentry,
+ req->r_old_dentry_dir,
req->r_path2, req->r_ino2.ino,
&path2, &pathlen2, &ino2, &freepath2);
if (ret < 0) {
@@ -1927,10 +1980,13 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
mds, req->r_inode_drop, req->r_inode_unless, 0);
if (req->r_dentry_drop)
releases += ceph_encode_dentry_release(&p, req->r_dentry,
- mds, req->r_dentry_drop, req->r_dentry_unless);
+ req->r_parent, mds, req->r_dentry_drop,
+ req->r_dentry_unless);
if (req->r_old_dentry_drop)
releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
- mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
+ req->r_old_dentry_dir, mds,
+ req->r_old_dentry_drop,
+ req->r_old_dentry_unless);
if (req->r_old_inode_drop)
releases += ceph_encode_inode_release(&p,
d_inode(req->r_old_dentry),
@@ -1956,7 +2012,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (req->r_pagelist) {
struct ceph_pagelist *pagelist = req->r_pagelist;
- atomic_inc(&pagelist->refcnt);
+ refcount_inc(&pagelist->refcnt);
ceph_msg_data_add_pagelist(msg, pagelist);
msg->hdr.data_len = cpu_to_le32(pagelist->length);
} else {
@@ -2012,7 +2068,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
- if (req->r_got_unsafe) {
+ if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
void *p;
/*
* Replay. Do not regenerate message (and rebuild
@@ -2061,16 +2117,16 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
rhead = msg->front.iov_base;
rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
- if (req->r_got_unsafe)
+ if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
flags |= CEPH_MDS_FLAG_REPLAY;
- if (req->r_locked_dir)
+ if (req->r_parent)
flags |= CEPH_MDS_FLAG_WANT_DENTRY;
rhead->flags = cpu_to_le32(flags);
rhead->num_fwd = req->r_num_fwd;
rhead->num_retry = req->r_attempts - 1;
rhead->ino = 0;
- dout(" r_locked_dir = %p\n", req->r_locked_dir);
+ dout(" r_parent = %p\n", req->r_parent);
return 0;
}
@@ -2084,8 +2140,8 @@ static int __do_request(struct ceph_mds_client *mdsc,
int mds = -1;
int err = 0;
- if (req->r_err || req->r_got_result) {
- if (req->r_aborted)
+ if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
+ if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
__unregister_request(mdsc, req);
goto out;
}
@@ -2096,12 +2152,12 @@ static int __do_request(struct ceph_mds_client *mdsc,
err = -EIO;
goto finish;
}
- if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+ if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
dout("do_request forced umount\n");
err = -EIO;
goto finish;
}
- if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
+ if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
if (mdsc->mdsmap_err) {
err = mdsc->mdsmap_err;
dout("do_request mdsmap err %d\n", err);
@@ -2215,7 +2271,7 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p);
- if (req->r_got_unsafe)
+ if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
continue;
if (req->r_attempts > 0)
continue; /* only new requests */
@@ -2250,11 +2306,11 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
dout("do_request on %p\n", req);
- /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
+ /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
- if (req->r_locked_dir)
- ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
+ if (req->r_parent)
+ ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
@@ -2289,7 +2345,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
mutex_lock(&mdsc->mutex);
/* only abort if we didn't race with a real reply */
- if (req->r_got_result) {
+ if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
err = le32_to_cpu(req->r_reply_info.head->result);
} else if (err < 0) {
dout("aborted request %lld with %d\n", req->r_tid, err);
@@ -2301,10 +2357,10 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
*/
mutex_lock(&req->r_fill_mutex);
req->r_err = err;
- req->r_aborted = true;
+ set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
mutex_unlock(&req->r_fill_mutex);
- if (req->r_locked_dir &&
+ if (req->r_parent &&
(req->r_op & CEPH_MDS_OP_WRITE))
ceph_invalidate_dir_request(req);
} else {
@@ -2323,7 +2379,7 @@ out:
*/
void ceph_invalidate_dir_request(struct ceph_mds_request *req)
{
- struct inode *inode = req->r_locked_dir;
+ struct inode *inode = req->r_parent;
dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
@@ -2379,14 +2435,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
}
/* dup? */
- if ((req->r_got_unsafe && !head->safe) ||
- (req->r_got_safe && head->safe)) {
+ if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
+ (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
pr_warn("got a dup %s reply on %llu from mds%d\n",
head->safe ? "safe" : "unsafe", tid, mds);
mutex_unlock(&mdsc->mutex);
goto out;
}
- if (req->r_got_safe) {
+ if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
pr_warn("got unsafe after safe on %llu from mds%d\n",
tid, mds);
mutex_unlock(&mdsc->mutex);
@@ -2425,10 +2481,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (head->safe) {
- req->r_got_safe = true;
+ set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
__unregister_request(mdsc, req);
- if (req->r_got_unsafe) {
+ if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
/*
* We already handled the unsafe response, now do the
* cleanup. No need to examine the response; the MDS
@@ -2437,7 +2493,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
* useful we could do with a revised return value.
*/
dout("got safe reply %llu, mds%d\n", tid, mds);
- list_del_init(&req->r_unsafe_item);
/* last unsafe request during umount? */
if (mdsc->stopping && !__get_oldest_req(mdsc))
@@ -2446,7 +2501,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
goto out;
}
} else {
- req->r_got_unsafe = true;
+ set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
if (req->r_unsafe_dir) {
struct ceph_inode_info *ci =
@@ -2486,7 +2541,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* insert trace into our cache */
mutex_lock(&req->r_fill_mutex);
current->journal_info = req;
- err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
+ err = ceph_fill_trace(mdsc->fsc->sb, req);
if (err == 0) {
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
req->r_op == CEPH_MDS_OP_LSSNAP))
@@ -2500,7 +2555,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (realm)
ceph_put_snap_realm(mdsc, realm);
- if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
+ if (err == 0 && req->r_target_inode &&
+ test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
spin_lock(&ci->i_unsafe_lock);
list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
@@ -2508,12 +2564,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
}
out_err:
mutex_lock(&mdsc->mutex);
- if (!req->r_aborted) {
+ if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
if (err) {
req->r_err = err;
} else {
req->r_reply = ceph_msg_get(msg);
- req->r_got_result = true;
+ set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
}
} else {
dout("reply arrived after request %lld was aborted\n", tid);
@@ -2557,7 +2613,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
goto out; /* dup reply? */
}
- if (req->r_aborted) {
+ if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
dout("forward tid %llu aborted, unregistering\n", tid);
__unregister_request(mdsc, req);
} else if (fwd_seq <= req->r_num_fwd) {
@@ -2567,7 +2623,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
/* resend. forward race not possible; mds would drop */
dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
BUG_ON(req->r_err);
- BUG_ON(req->r_got_result);
+ BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
req->r_attempts = 0;
req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds;
@@ -2603,8 +2659,10 @@ static void handle_session(struct ceph_mds_session *session,
seq = le64_to_cpu(h->seq);
mutex_lock(&mdsc->mutex);
- if (op == CEPH_SESSION_CLOSE)
+ if (op == CEPH_SESSION_CLOSE) {
+ get_session(session);
__unregister_session(mdsc, session);
+ }
/* FIXME: this ttl calculation is generous */
session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
mutex_unlock(&mdsc->mutex);
@@ -2693,6 +2751,8 @@ static void handle_session(struct ceph_mds_session *session,
kick_requests(mdsc, mds);
mutex_unlock(&mdsc->mutex);
}
+ if (op == CEPH_SESSION_CLOSE)
+ ceph_put_mds_session(session);
return;
bad:
@@ -2732,7 +2792,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p);
- if (req->r_got_unsafe)
+ if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
continue;
if (req->r_attempts == 0)
continue; /* only old requests */
@@ -3072,7 +3132,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
dout("check_new_map new %u old %u\n",
newmap->m_epoch, oldmap->m_epoch);
- for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
+ for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
if (mdsc->sessions[i] == NULL)
continue;
s = mdsc->sessions[i];
@@ -3086,15 +3146,33 @@ static void check_new_map(struct ceph_mds_client *mdsc,
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
ceph_session_state_name(s->s_state));
- if (i >= newmap->m_max_mds ||
+ if (i >= newmap->m_num_mds ||
memcmp(ceph_mdsmap_get_addr(oldmap, i),
ceph_mdsmap_get_addr(newmap, i),
sizeof(struct ceph_entity_addr))) {
if (s->s_state == CEPH_MDS_SESSION_OPENING) {
/* the session never opened, just close it
* out now */
+ get_session(s);
+ __unregister_session(mdsc, s);
__wake_requests(mdsc, &s->s_waiting);
+ ceph_put_mds_session(s);
+ } else if (i >= newmap->m_num_mds) {
+ /* force close session for stopped mds */
+ get_session(s);
__unregister_session(mdsc, s);
+ __wake_requests(mdsc, &s->s_waiting);
+ kick_requests(mdsc, i);
+ mutex_unlock(&mdsc->mutex);
+
+ mutex_lock(&s->s_mutex);
+ cleanup_session_requests(mdsc, s);
+ remove_session_caps(s);
+ mutex_unlock(&s->s_mutex);
+
+ ceph_put_mds_session(s);
+
+ mutex_lock(&mdsc->mutex);
} else {
/* just close it */
mutex_unlock(&mdsc->mutex);
@@ -3132,7 +3210,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
}
}
- for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+ for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
s = mdsc->sessions[i];
if (!s)
continue;
@@ -3556,7 +3634,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
{
u64 want_tid, want_flush;
- if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
+ if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
return;
dout("sync\n");
@@ -3587,7 +3665,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
*/
static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
{
- if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
+ if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
return true;
return atomic_read(&mdsc->num_sessions) <= skipped;
}
@@ -3691,13 +3769,13 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
{
struct ceph_mds_client *mdsc = fsc->mdsc;
-
dout("mdsc_destroy %p\n", mdsc);
- ceph_mdsc_stop(mdsc);
/* flush out any connection work with references to us */
ceph_msgr_flush();
+ ceph_mdsc_stop(mdsc);
+
fsc->mdsc = NULL;
kfree(mdsc);
dout("mdsc_destroy %p done\n", mdsc);
@@ -3846,7 +3924,7 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
struct ceph_mds_session *s = con->private;
if (get_session(s)) {
- dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
+ dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
return con;
}
dout("mdsc con_get %p FAIL\n", s);
@@ -3857,7 +3935,7 @@ static void con_put(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
- dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
+ dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
ceph_put_mds_session(s);
}