summaryrefslogtreecommitdiffstats
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c341
1 files changed, 214 insertions, 127 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 920e9f048bd8..bbbbddf71326 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -9,6 +9,7 @@
#include <linux/debugfs.h>
#include <linux/seq_file.h>
#include <linux/ratelimit.h>
+#include <linux/bits.h>
#include "super.h"
#include "mds_client.h"
@@ -384,8 +385,8 @@ static int parse_reply_info_readdir(void **p, void *end,
}
done:
- if (*p != end)
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
bad:
@@ -406,12 +407,10 @@ static int parse_reply_info_filelock(void **p, void *end,
goto bad;
info->filelock_reply = *p;
- *p += sizeof(*info->filelock_reply);
- if (unlikely(*p != end))
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
-
bad:
return -EIO;
}
@@ -425,18 +424,21 @@ static int parse_reply_info_create(void **p, void *end,
{
if (features == (u64)-1 ||
(features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
+ /* Malformed reply? */
if (*p == end) {
info->has_create_ino = false;
} else {
info->has_create_ino = true;
- info->ino = ceph_decode_64(p);
+ ceph_decode_64_safe(p, end, info->ino, bad);
}
+ } else {
+ if (*p != end)
+ goto bad;
}
- if (unlikely(*p != end))
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
-
bad:
return -EIO;
}
@@ -529,6 +531,7 @@ const char *ceph_session_state_name(int s)
case CEPH_MDS_SESSION_OPEN: return "open";
case CEPH_MDS_SESSION_HUNG: return "hung";
case CEPH_MDS_SESSION_CLOSING: return "closing";
+ case CEPH_MDS_SESSION_CLOSED: return "closed";
case CEPH_MDS_SESSION_RESTARTING: return "restarting";
case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
case CEPH_MDS_SESSION_REJECTED: return "rejected";
@@ -536,7 +539,7 @@ const char *ceph_session_state_name(int s)
}
}
-static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
+struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
{
if (refcount_inc_not_zero(&s->s_ref)) {
dout("mdsc get_session %p %d -> %d\n", s,
@@ -567,7 +570,7 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
{
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
return NULL;
- return get_session(mdsc->sessions[mds]);
+ return ceph_get_mds_session(mdsc->sessions[mds]);
}
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
@@ -596,7 +599,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
{
struct ceph_mds_session *s;
- if (mds >= mdsc->mdsmap->m_num_mds)
+ if (mds >= mdsc->mdsmap->possible_max_rank)
return ERR_PTR(-EINVAL);
s = kzalloc(sizeof(*s), GFP_NOFS);
@@ -639,7 +642,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s->s_renew_seq = 0;
INIT_LIST_HEAD(&s->s_caps);
s->s_nr_caps = 0;
- s->s_trim_caps = 0;
refcount_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
@@ -674,7 +676,6 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
dout("__unregister_session mds%d %p\n", s->s_mds, s);
BUG_ON(mdsc->sessions[s->s_mds] != s);
mdsc->sessions[s->s_mds] = NULL;
- s->s_state = 0;
ceph_con_close(&s->s_con);
ceph_put_mds_session(s);
atomic_dec(&mdsc->num_sessions);
@@ -708,8 +709,10 @@ void ceph_mdsc_release_request(struct kref *kref)
/* avoid calling iput_final() in mds dispatch threads */
ceph_async_iput(req->r_inode);
}
- if (req->r_parent)
+ if (req->r_parent) {
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
+ ceph_async_iput(req->r_parent);
+ }
ceph_async_iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
@@ -876,7 +879,8 @@ static struct inode *get_nonsnap_parent(struct dentry *dentry)
* Called under mdsc->mutex.
*/
static int __choose_mds(struct ceph_mds_client *mdsc,
- struct ceph_mds_request *req)
+ struct ceph_mds_request *req,
+ bool *random)
{
struct inode *inode;
struct ceph_inode_info *ci;
@@ -886,6 +890,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
u32 hash = req->r_direct_hash;
bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
+ if (random)
+ *random = false;
+
/*
* is there a specific mds we should try? ignore hint if we have
* no session and the mds is not up (active or recovering).
@@ -893,7 +900,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
if (req->r_resend_mds >= 0 &&
(__have_session(mdsc, req->r_resend_mds) ||
ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
- dout("choose_mds using resend_mds mds%d\n",
+ dout("%s using resend_mds mds%d\n", __func__,
req->r_resend_mds);
return req->r_resend_mds;
}
@@ -911,7 +918,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
rcu_read_lock();
inode = get_nonsnap_parent(req->r_dentry);
rcu_read_unlock();
- dout("__choose_mds using snapdir's parent %p\n", inode);
+ dout("%s using snapdir's parent %p\n", __func__, inode);
}
} else if (req->r_dentry) {
/* ignore race with rename; old or new d_parent is okay */
@@ -931,7 +938,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
/* direct snapped/virtual snapdir requests
* based on parent dir inode */
inode = get_nonsnap_parent(parent);
- dout("__choose_mds using nonsnap parent %p\n", inode);
+ dout("%s using nonsnap parent %p\n", __func__, inode);
} else {
/* dentry target */
inode = d_inode(req->r_dentry);
@@ -947,8 +954,8 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
rcu_read_unlock();
}
- dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
- (int)hash, mode);
+ dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
+ hash, mode);
if (!inode)
goto random;
ci = ceph_inode(inode);
@@ -966,30 +973,33 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
get_random_bytes(&r, 1);
r %= frag.ndist;
mds = frag.dist[r];
- dout("choose_mds %p %llx.%llx "
- "frag %u mds%d (%d/%d)\n",
- inode, ceph_vinop(inode),
- frag.frag, mds,
- (int)r, frag.ndist);
+ dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
+ __func__, inode, ceph_vinop(inode),
+ frag.frag, mds, (int)r, frag.ndist);
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
- CEPH_MDS_STATE_ACTIVE)
+ CEPH_MDS_STATE_ACTIVE &&
+ !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
goto out;
}
/* since this file/dir wasn't known to be
* replicated, then we want to look for the
* authoritative mds. */
- mode = USE_AUTH_MDS;
if (frag.mds >= 0) {
/* choose auth mds */
mds = frag.mds;
- dout("choose_mds %p %llx.%llx "
- "frag %u mds%d (auth)\n",
- inode, ceph_vinop(inode), frag.frag, mds);
+ dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
+ __func__, inode, ceph_vinop(inode),
+ frag.frag, mds);
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
- CEPH_MDS_STATE_ACTIVE)
- goto out;
+ CEPH_MDS_STATE_ACTIVE) {
+ if (mode == USE_ANY_MDS &&
+ !ceph_mdsmap_is_laggy(mdsc->mdsmap,
+ mds))
+ goto out;
+ }
}
+ mode = USE_AUTH_MDS;
}
}
@@ -1005,7 +1015,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
goto random;
}
mds = cap->session->s_mds;
- dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
+ dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
inode, ceph_vinop(inode), mds,
cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock);
@@ -1016,8 +1026,11 @@ out:
return mds;
random:
+ if (random)
+ *random = true;
+
mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
- dout("choose_mds chose random mds%d\n", mds);
+ dout("%s chose random mds%d\n", __func__, mds);
return mds;
}
@@ -1043,20 +1056,21 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
return msg;
}
+static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
+#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
static void encode_supported_features(void **p, void *end)
{
- static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
- static const size_t count = ARRAY_SIZE(bits);
+ static const size_t count = ARRAY_SIZE(feature_bits);
if (count > 0) {
size_t i;
- size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
+ size_t size = FEATURE_BYTES(count);
BUG_ON(*p + 4 + size > end);
ceph_encode_32(p, size);
memset(*p, 0, size);
for (i = 0; i < count; i++)
- ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
+ ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
*p += size;
} else {
BUG_ON(*p + 4 > end);
@@ -1077,6 +1091,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
int metadata_key_count = 0;
struct ceph_options *opt = mdsc->fsc->client->options;
struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
+ size_t size, count;
void *p, *end;
const char* metadata[][2] = {
@@ -1094,8 +1109,13 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
strlen(metadata[i][1]);
metadata_key_count++;
}
+
/* supported feature */
- extra_bytes += 4 + 8;
+ size = 0;
+ count = ARRAY_SIZE(feature_bits);
+ if (count > 0)
+ size = FEATURE_BYTES(count);
+ extra_bytes += 4 + size;
/* Allocate the message */
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
@@ -1115,7 +1135,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
* Serialize client metadata into waiting buffer space, using
* the format that userspace expects for map<string, string>
*
- * ClientSession messages with metadata are v2
+ * ClientSession messages with metadata are v3
*/
msg->hdr.version = cpu_to_le16(3);
msg->hdr.compat_version = cpu_to_le16(1);
@@ -1217,7 +1237,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *ts;
int i, mds = session->s_mds;
- if (mds >= mdsc->mdsmap->m_num_mds)
+ if (mds >= mdsc->mdsmap->possible_max_rank)
return;
mi = &mdsc->mdsmap->m_info[mds];
@@ -1270,6 +1290,7 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
{
struct ceph_mds_request *req;
struct rb_node *p;
+ struct ceph_inode_info *ci;
dout("cleanup_session_requests mds%d\n", session->s_mds);
mutex_lock(&mdsc->mutex);
@@ -1278,6 +1299,16 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
struct ceph_mds_request, r_unsafe_item);
pr_warn_ratelimited(" dropping unsafe request %llu\n",
req->r_tid);
+ if (req->r_target_inode) {
+ /* dropping unsafe change of inode's attributes */
+ ci = ceph_inode(req->r_target_inode);
+ errseq_set(&ci->i_meta_err, -EIO);
+ }
+ if (req->r_unsafe_dir) {
+ /* dropping unsafe directory operation */
+ ci = ceph_inode(req->r_unsafe_dir);
+ errseq_set(&ci->i_meta_err, -EIO);
+ }
__unregister_request(mdsc, req);
}
/* zero r_attempts, so kick_requests() will re-send requests */
@@ -1370,7 +1401,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
struct ceph_inode_info *ci = ceph_inode(inode);
LIST_HEAD(to_remove);
- bool drop = false;
+ bool dirty_dropped = false;
bool invalidate = false;
dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1383,9 +1414,12 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_cap_flush *cf;
struct ceph_mds_client *mdsc = fsc->mdsc;
- if (ci->i_wrbuffer_ref > 0 &&
- READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
- invalidate = true;
+ if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+ if (inode->i_data.nrpages > 0)
+ invalidate = true;
+ if (ci->i_wrbuffer_ref > 0)
+ mapping_set_error(&inode->i_data, -EIO);
+ }
while (!list_empty(&ci->i_cap_flush_list)) {
cf = list_first_entry(&ci->i_cap_flush_list,
@@ -1405,7 +1439,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
- drop = true;
+ dirty_dropped = true;
}
if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited(
@@ -1415,10 +1449,22 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
- drop = true;
+ dirty_dropped = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
+ if (dirty_dropped) {
+ errseq_set(&ci->i_meta_err, -EIO);
+
+ if (ci->i_wrbuffer_ref_head == 0 &&
+ ci->i_wr_ref == 0 &&
+ ci->i_dirty_caps == 0 &&
+ ci->i_flushing_caps == 0) {
+ ceph_put_snap_context(ci->i_head_snapc);
+ ci->i_head_snapc = NULL;
+ }
+ }
+
if (atomic_read(&ci->i_filelock_ref) > 0) {
/* make further file lock syscall return -EIO */
ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
@@ -1430,15 +1476,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
ci->i_prealloc_cap_flush = NULL;
}
-
- if (drop &&
- ci->i_wrbuffer_ref_head == 0 &&
- ci->i_wr_ref == 0 &&
- ci->i_dirty_caps == 0 &&
- ci->i_flushing_caps == 0) {
- ceph_put_snap_context(ci->i_head_snapc);
- ci->i_head_snapc = NULL;
- }
}
spin_unlock(&ci->i_ceph_lock);
while (!list_empty(&to_remove)) {
@@ -1452,7 +1489,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
wake_up_all(&ci->i_cap_wq);
if (invalidate)
ceph_queue_invalidate(inode);
- if (drop)
+ if (dirty_dropped)
iput(inode);
return 0;
}
@@ -1705,11 +1742,11 @@ out:
*/
static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
{
- struct ceph_mds_session *session = arg;
+ int *remaining = arg;
struct ceph_inode_info *ci = ceph_inode(inode);
int used, wanted, oissued, mine;
- if (session->s_trim_caps <= 0)
+ if (*remaining <= 0)
return -1;
spin_lock(&ci->i_ceph_lock);
@@ -1746,7 +1783,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
if (oissued) {
/* we aren't the only cap.. just remove us */
__ceph_remove_cap(cap, true);
- session->s_trim_caps--;
+ (*remaining)--;
} else {
struct dentry *dentry;
/* try dropping referring dentries */
@@ -1758,7 +1795,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
d_prune_aliases(inode);
count = atomic_read(&inode->i_count);
if (count == 1)
- session->s_trim_caps--;
+ (*remaining)--;
dout("trim_caps_cb %p cap %p pruned, count now %d\n",
inode, cap, count);
} else {
@@ -1784,12 +1821,12 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
dout("trim_caps mds%d start: %d / %d, trim %d\n",
session->s_mds, session->s_nr_caps, max_caps, trim_caps);
if (trim_caps > 0) {
- session->s_trim_caps = trim_caps;
- ceph_iterate_session_caps(session, trim_caps_cb, session);
+ int remaining = trim_caps;
+
+ ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
session->s_mds, session->s_nr_caps, max_caps,
- trim_caps - session->s_trim_caps);
- session->s_trim_caps = 0;
+ trim_caps - remaining);
}
ceph_flush_cap_releases(mdsc, session);
@@ -1948,7 +1985,7 @@ void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
if (mdsc->stopping)
return;
- get_session(session);
+ ceph_get_mds_session(session);
if (queue_work(mdsc->fsc->cap_wq,
&session->s_cap_release_work)) {
dout("cap release work queued\n");
@@ -1998,7 +2035,7 @@ void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
if (!nr)
return;
val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
- if (!(val % CEPH_CAPS_PER_RELEASE)) {
+ if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
atomic_set(&mdsc->cap_reclaim_pending, 0);
ceph_queue_cap_reclaim_work(mdsc);
}
@@ -2015,12 +2052,13 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
size_t size = sizeof(struct ceph_mds_reply_dir_entry);
- int order, num_entries;
+ unsigned int num_entries;
+ int order;
spin_lock(&ci->i_ceph_lock);
num_entries = ci->i_files + ci->i_subdirs;
spin_unlock(&ci->i_ceph_lock);
- num_entries = max(num_entries, 1);
+ num_entries = max(num_entries, 1U);
num_entries = min(num_entries, opt->max_readdir);
order = get_order(size * num_entries);
@@ -2052,7 +2090,6 @@ struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
{
struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
- struct timespec64 ts;
if (!req)
return ERR_PTR(-ENOMEM);
@@ -2071,8 +2108,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
- ktime_get_coarse_real_ts64(&ts);
- req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
+ ktime_get_coarse_real_ts64(&req->r_stamp);
req->r_op = op;
req->r_direct_mode = mode;
@@ -2165,13 +2201,17 @@ retry:
}
base = ceph_ino(d_inode(temp));
rcu_read_unlock();
- if (pos < 0 || read_seqretry(&rename_lock, seq)) {
- pr_err("build_path did not end path lookup where "
- "expected, pos is %d\n", pos);
- /* presumably this is only possible if racing with a
- rename of one of the parent directories (we can not
- lock the dentries above us to prevent this, but
- retrying should be harmless) */
+
+ if (read_seqretry(&rename_lock, seq))
+ goto retry;
+
+ if (pos < 0) {
+ /*
+ * A rename didn't occur, but somehow we didn't end up where
+ * we thought we would. Throw a warning and try again.
+ */
+ pr_warn("build_path did not end path lookup where "
+ "expected, pos is %d\n", pos);
goto retry;
}
@@ -2328,6 +2368,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
head->op = cpu_to_le32(req->r_op);
head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
+ head->ino = 0;
head->args = req->r_args;
ceph_encode_filepath(&p, end, ino1, path1);
@@ -2493,6 +2534,26 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
}
/*
+ * called under mdsc->mutex
+ */
+static int __send_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ struct ceph_mds_request *req,
+ bool drop_cap_releases)
+{
+ int err;
+
+ err = __prepare_send_request(mdsc, req, session->s_mds,
+ drop_cap_releases);
+ if (!err) {
+ ceph_msg_get(req->r_request);
+ ceph_con_send(&session->s_con, req->r_request);
+ }
+
+ return err;
+}
+
+/*
* send request, or put it on the appropriate wait list.
*/
static void __do_request(struct ceph_mds_client *mdsc,
@@ -2501,6 +2562,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session = NULL;
int mds = -1;
int err = 0;
+ bool random;
if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
@@ -2533,15 +2595,14 @@ static void __do_request(struct ceph_mds_client *mdsc,
if (!(mdsc->fsc->mount_options->flags &
CEPH_MOUNT_OPT_MOUNTWAIT) &&
!ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
- err = -ENOENT;
- pr_info("probably no mds server is up\n");
+ err = -EHOSTUNREACH;
goto finish;
}
}
put_request_session(req);
- mds = __choose_mds(mdsc, req);
+ mds = __choose_mds(mdsc, req, &random);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
dout("do_request no mds or not active, waiting for map\n");
@@ -2558,7 +2619,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
goto finish;
}
}
- req->r_session = get_session(session);
+ req->r_session = ceph_get_mds_session(session);
dout("do_request mds%d session %p state %s\n", mds, session,
ceph_session_state_name(session->s_state));
@@ -2569,8 +2630,12 @@ static void __do_request(struct ceph_mds_client *mdsc,
goto out_session;
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
- session->s_state == CEPH_MDS_SESSION_CLOSING)
+ session->s_state == CEPH_MDS_SESSION_CLOSING) {
__open_session(mdsc, session);
+ /* retry the same mds later */
+ if (random)
+ req->r_resend_mds = mds;
+ }
list_add(&req->r_wait, &session->s_waiting);
goto out_session;
}
@@ -2581,11 +2646,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
if (req->r_request_started == 0) /* note request start time */
req->r_request_started = jiffies;
- err = __prepare_send_request(mdsc, req, mds, false);
- if (!err) {
- ceph_msg_get(req->r_request);
- ceph_con_send(&session->s_con, req->r_request);
- }
+ err = __send_request(mdsc, session, req, false);
out_session:
ceph_put_mds_session(session);
@@ -2653,8 +2714,10 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
- if (req->r_parent)
+ if (req->r_parent) {
ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
+ ihold(req->r_parent);
+ }
if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
@@ -2836,7 +2899,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_unlock(&mdsc->mutex);
goto out;
} else {
- int mds = __choose_mds(mdsc, req);
+ int mds = __choose_mds(mdsc, req, NULL);
if (mds >= 0 && mds != req->r_session->s_mds) {
dout("but auth changed, so resending\n");
__do_request(mdsc, req);
@@ -2852,6 +2915,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
__unregister_request(mdsc, req);
+ /* last request during umount? */
+ if (mdsc->stopping && !__get_oldest_req(mdsc))
+ complete_all(&mdsc->safe_umount_waiters);
+
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
/*
* We already handled the unsafe response, now do the
@@ -2862,9 +2929,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
*/
dout("got safe reply %llu, mds%d\n", tid, mds);
- /* last unsafe request during umount? */
- if (mdsc->stopping && !__get_oldest_req(mdsc))
- complete_all(&mdsc->safe_umount_waiters);
mutex_unlock(&mdsc->mutex);
goto out;
}
@@ -3015,18 +3079,23 @@ bad:
pr_err("mdsc_handle_forward decode error err=%d\n", err);
}
-static int __decode_and_drop_session_metadata(void **p, void *end)
+static int __decode_session_metadata(void **p, void *end,
+ bool *blacklisted)
{
/* map<string,string> */
u32 n;
+ bool err_str;
ceph_decode_32_safe(p, end, n, bad);
while (n-- > 0) {
u32 len;
ceph_decode_32_safe(p, end, len, bad);
ceph_decode_need(p, end, len, bad);
+ err_str = !strncmp(*p, "error_string", len);
*p += len;
ceph_decode_32_safe(p, end, len, bad);
ceph_decode_need(p, end, len, bad);
+ if (err_str && strnstr(*p, "blacklisted", len))
+ *blacklisted = true;
*p += len;
}
return 0;
@@ -3050,6 +3119,7 @@ static void handle_session(struct ceph_mds_session *session,
u64 seq;
unsigned long features = 0;
int wake = 0;
+ bool blacklisted = false;
/* decode */
ceph_decode_need(&p, end, sizeof(*h), bad);
@@ -3062,7 +3132,7 @@ static void handle_session(struct ceph_mds_session *session,
if (msg_version >= 3) {
u32 len;
/* version >= 2, metadata */
- if (__decode_and_drop_session_metadata(&p, end) < 0)
+ if (__decode_session_metadata(&p, end, &blacklisted) < 0)
goto bad;
/* version >= 3, feature bits */
ceph_decode_32_safe(&p, end, len, bad);
@@ -3073,7 +3143,7 @@ static void handle_session(struct ceph_mds_session *session,
mutex_lock(&mdsc->mutex);
if (op == CEPH_SESSION_CLOSE) {
- get_session(session);
+ ceph_get_mds_session(session);
__unregister_session(mdsc, session);
}
/* FIXME: this ttl calculation is generous */
@@ -3111,6 +3181,7 @@ static void handle_session(struct ceph_mds_session *session,
case CEPH_SESSION_CLOSE:
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
pr_info("mds%d reconnect denied\n", session->s_mds);
+ session->s_state = CEPH_MDS_SESSION_CLOSED;
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
wake = 2; /* for good measure */
@@ -3149,6 +3220,8 @@ static void handle_session(struct ceph_mds_session *session,
session->s_state = CEPH_MDS_SESSION_REJECTED;
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
+ if (blacklisted)
+ mdsc->fsc->blacklisted = true;
wake = 2; /* for good measure */
break;
@@ -3176,7 +3249,6 @@ bad:
return;
}
-
/*
* called under session->mutex.
*/
@@ -3185,18 +3257,12 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
{
struct ceph_mds_request *req, *nreq;
struct rb_node *p;
- int err;
dout("replay_unsafe_requests mds%d\n", session->s_mds);
mutex_lock(&mdsc->mutex);
- list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
- err = __prepare_send_request(mdsc, req, session->s_mds, true);
- if (!err) {
- ceph_msg_get(req->r_request);
- ceph_con_send(&session->s_con, req->r_request);
- }
- }
+ list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
+ __send_request(mdsc, session, req, true);
/*
* also re-send old requests when MDS enters reconnect stage. So that MDS
@@ -3211,14 +3277,8 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
if (req->r_attempts == 0)
continue; /* only old requests */
if (req->r_session &&
- req->r_session->s_mds == session->s_mds) {
- err = __prepare_send_request(mdsc, req,
- session->s_mds, true);
- if (!err) {
- ceph_msg_get(req->r_request);
- ceph_con_send(&session->s_con, req->r_request);
- }
- }
+ req->r_session->s_mds == session->s_mds)
+ __send_request(mdsc, session, req, true);
}
mutex_unlock(&mdsc->mutex);
}
@@ -3729,7 +3789,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
dout("check_new_map new %u old %u\n",
newmap->m_epoch, oldmap->m_epoch);
- for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
+ for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
if (!mdsc->sessions[i])
continue;
s = mdsc->sessions[i];
@@ -3743,9 +3803,9 @@ static void check_new_map(struct ceph_mds_client *mdsc,
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
ceph_session_state_name(s->s_state));
- if (i >= newmap->m_num_mds) {
+ if (i >= newmap->possible_max_rank) {
/* force close session for stopped mds */
- get_session(s);
+ ceph_get_mds_session(s);
__unregister_session(mdsc, s);
__wake_requests(mdsc, &s->s_waiting);
mutex_unlock(&mdsc->mutex);
@@ -3800,7 +3860,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
}
}
- for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
+ for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
s = mdsc->sessions[i];
if (!s)
continue;
@@ -3998,7 +4058,27 @@ static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
mutex_unlock(&mdsc->mutex);
}
+static void maybe_recover_session(struct ceph_mds_client *mdsc)
+{
+ struct ceph_fs_client *fsc = mdsc->fsc;
+ if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
+ return;
+
+ if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
+ return;
+
+ if (!READ_ONCE(fsc->blacklisted))
+ return;
+
+ if (fsc->last_auto_reconnect &&
+ time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
+ return;
+
+ pr_info("auto reconnect after blacklisted\n");
+ fsc->last_auto_reconnect = jiffies;
+ ceph_force_reconnect(fsc->sb);
+}
/*
* delayed work -- periodically trim expired leases, renew caps with mds
@@ -4044,7 +4124,9 @@ static void delayed_work(struct work_struct *work)
pr_info("mds%d hung\n", s->s_mds);
}
}
- if (s->s_state < CEPH_MDS_SESSION_OPEN) {
+ if (s->s_state == CEPH_MDS_SESSION_NEW ||
+ s->s_state == CEPH_MDS_SESSION_RESTARTING ||
+ s->s_state == CEPH_MDS_SESSION_REJECTED) {
/* this mds is failed or recovering, just wait */
ceph_put_mds_session(s);
continue;
@@ -4072,6 +4154,8 @@ static void delayed_work(struct work_struct *work)
ceph_trim_snapid_map(mdsc);
+ maybe_recover_session(mdsc);
+
schedule_delayed(mdsc);
}
@@ -4114,6 +4198,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
mdsc->last_renew_caps = jiffies;
INIT_LIST_HEAD(&mdsc->cap_delay_list);
+ INIT_LIST_HEAD(&mdsc->cap_wait_list);
spin_lock_init(&mdsc->cap_delay_lock);
INIT_LIST_HEAD(&mdsc->snap_flush_list);
spin_lock_init(&mdsc->snap_flush_lock);
@@ -4321,7 +4406,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++) {
if (mdsc->sessions[i]) {
- session = get_session(mdsc->sessions[i]);
+ session = ceph_get_mds_session(mdsc->sessions[i]);
__unregister_session(mdsc, session);
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
@@ -4355,7 +4440,12 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
session = __ceph_lookup_mds_session(mdsc, mds);
if (!session)
continue;
+
+ if (session->s_state == CEPH_MDS_SESSION_REJECTED)
+ __unregister_session(mdsc, session);
+ __wake_requests(mdsc, &session->s_waiting);
mutex_unlock(&mdsc->mutex);
+
mutex_lock(&session->s_mutex);
__close_session(mdsc, session);
if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
@@ -4364,6 +4454,7 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
}
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
+
mutex_lock(&mdsc->mutex);
kick_requests(mdsc, mds);
}
@@ -4543,11 +4634,8 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
- if (get_session(s)) {
- dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
+ if (ceph_get_mds_session(s))
return con;
- }
- dout("mdsc con_get %p FAIL\n", s);
return NULL;
}
@@ -4555,7 +4643,6 @@ static void con_put(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
- dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
ceph_put_mds_session(s);
}
OpenPOWER on IntegriCloud