summaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/Makefile4
-rw-r--r--fs/ceph/acl.c4
-rw-r--r--fs/ceph/addr.c66
-rw-r--r--fs/ceph/cache.c11
-rw-r--r--fs/ceph/cache.h5
-rw-r--r--fs/ceph/caps.c232
-rw-r--r--fs/ceph/debugfs.c16
-rw-r--r--fs/ceph/dir.c20
-rw-r--r--fs/ceph/export.c60
-rw-r--r--fs/ceph/file.c150
-rw-r--r--fs/ceph/inode.c105
-rw-r--r--fs/ceph/io.c163
-rw-r--r--fs/ceph/io.h12
-rw-r--r--fs/ceph/locks.c11
-rw-r--r--fs/ceph/mds_client.c341
-rw-r--r--fs/ceph/mds_client.h56
-rw-r--r--fs/ceph/mdsmap.c110
-rw-r--r--fs/ceph/snap.c4
-rw-r--r--fs/ceph/super.c746
-rw-r--r--fs/ceph/super.h86
-rw-r--r--fs/ceph/util.c100
-rw-r--r--fs/ceph/xattr.c102
22 files changed, 1488 insertions, 916 deletions
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index a699e320393f..0a0823d378db 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -6,9 +6,9 @@
obj-$(CONFIG_CEPH_FS) += ceph.o
ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
- export.o caps.o snap.o xattr.o quota.o \
+ export.o caps.o snap.o xattr.o quota.o io.o \
mds_client.o mdsmap.o strings.o ceph_frag.o \
- debugfs.o
+ debugfs.o util.o
ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index aa55f412a6e3..26be6520d3fb 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -222,8 +222,8 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
if (err)
goto out_err;
- err = ceph_pagelist_encode_string(pagelist,
- XATTR_NAME_POSIX_ACL_DEFAULT, len);
+ ceph_pagelist_encode_string(pagelist,
+ XATTR_NAME_POSIX_ACL_DEFAULT, len);
err = posix_acl_to_xattr(&init_user_ns, default_acl,
tmp_buf, val_size2);
if (err < 0)
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index e078cc55b989..7ab616601141 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -189,8 +189,7 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
{
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_osd_client *osdc =
- &ceph_inode_to_client(inode)->client->osdc;
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
int err = 0;
u64 off = page_offset(page);
u64 len = PAGE_SIZE;
@@ -219,8 +218,8 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
- err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
- off, &len,
+ err = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
+ &ci->i_layout, off, &len,
ci->i_truncate_seq, ci->i_truncate_size,
&page, 1, 0);
if (err == -ENOENT)
@@ -228,6 +227,8 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
if (err < 0) {
SetPageError(page);
ceph_fscache_readpage_cancel(inode, page);
+ if (err == -EBLACKLISTED)
+ fsc->blacklisted = true;
goto out;
}
if (err < PAGE_SIZE)
@@ -266,6 +267,8 @@ static void finish_read(struct ceph_osd_request *req)
int i;
dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
+ if (rc == -EBLACKLISTED)
+ ceph_inode_to_client(inode)->blacklisted = true;
/* unlock all pages, zeroing any data we didn't read */
osd_data = osd_req_op_extent_osd_data(req, 0);
@@ -323,7 +326,8 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
/* caller of readpages does not hold buffer and read caps
* (fadvise, madvise and readahead cases) */
int want = CEPH_CAP_FILE_CACHE;
- ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, true, &got);
+ ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want,
+ true, &got);
if (ret < 0) {
dout("start_read %p, error getting cap\n", inode);
} else if (!(got & want)) {
@@ -569,7 +573,7 @@ static u64 get_writepages_data_length(struct inode *inode,
/*
* Write a single page, but leave the page locked.
*
- * If we get a write error, set the page error bit, but still adjust the
+ * If we get a write error, mark the mapping for error, but still adjust the
* dirty page accounting (i.e., page is no longer dirty).
*/
static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
@@ -640,9 +644,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
end_page_writeback(page);
return err;
}
+ if (err == -EBLACKLISTED)
+ fsc->blacklisted = true;
dout("writepage setting page/mapping error %d %p\n",
err, page);
- SetPageError(page);
mapping_set_error(&inode->i_data, err);
wbc->pages_skipped++;
} else {
@@ -680,23 +685,6 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
}
/*
- * lame release_pages helper. release_pages() isn't exported to
- * modules.
- */
-static void ceph_release_pages(struct page **pages, int num)
-{
- struct pagevec pvec;
- int i;
-
- pagevec_init(&pvec);
- for (i = 0; i < num; i++) {
- if (pagevec_add(&pvec, pages[i]) == 0)
- pagevec_release(&pvec);
- }
- pagevec_release(&pvec);
-}
-
-/*
* async writeback completion handler.
*
* If we get an error, set the mapping error bit, but not the individual
@@ -720,6 +708,8 @@ static void writepages_finish(struct ceph_osd_request *req)
if (rc < 0) {
mapping_set_error(mapping, rc);
ceph_set_error_write(ci);
+ if (rc == -EBLACKLISTED)
+ fsc->blacklisted = true;
} else {
ceph_clear_error_write(ci);
}
@@ -769,7 +759,7 @@ static void writepages_finish(struct ceph_osd_request *req)
dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
inode, osd_data->length, rc >= 0 ? num_pages : 0);
- ceph_release_pages(osd_data->pages, num_pages);
+ release_pages(osd_data->pages, num_pages);
}
ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
@@ -913,8 +903,9 @@ get_more_pages:
if (page_offset(page) >= ceph_wbc.i_size) {
dout("%p page eof %llu\n",
page, ceph_wbc.i_size);
- if (ceph_wbc.size_stable ||
- page_offset(page) >= i_size_read(inode))
+ if ((ceph_wbc.size_stable ||
+ page_offset(page) >= i_size_read(inode)) &&
+ clear_page_dirty_for_io(page))
mapping->a_ops->invalidatepage(page,
0, PAGE_SIZE);
unlock_page(page);
@@ -1451,7 +1442,8 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
want = CEPH_CAP_FILE_CACHE;
got = 0;
- err = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
+ err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1,
+ &got, &pinned_page);
if (err < 0)
goto out_restore;
@@ -1539,6 +1531,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
if (!prealloc_cf)
return VM_FAULT_OOM;
+ sb_start_pagefault(inode->i_sb);
ceph_block_sigs(&oldset);
if (ci->i_inline_version != CEPH_INLINE_NONE) {
@@ -1567,7 +1560,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
want = CEPH_CAP_FILE_BUFFER;
got = 0;
- err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
+ err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len,
&got, NULL);
if (err < 0)
goto out_free;
@@ -1613,6 +1606,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
ceph_put_cap_refs(ci, got);
out_free:
ceph_restore_sigs(&oldset);
+ sb_end_pagefault(inode->i_sb);
ceph_free_cap_flush(prealloc_cf);
if (err < 0)
ret = vmf_error(err);
@@ -1945,12 +1939,17 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
if (err >= 0 || err == -ENOENT)
have |= POOL_READ;
- else if (err != -EPERM)
+ else if (err != -EPERM) {
+ if (err == -EBLACKLISTED)
+ fsc->blacklisted = true;
goto out_unlock;
+ }
if (err2 == 0 || err2 == -EEXIST)
have |= POOL_WRITE;
else if (err2 != -EPERM) {
+ if (err2 == -EBLACKLISTED)
+ fsc->blacklisted = true;
err = err2;
goto out_unlock;
}
@@ -1988,10 +1987,11 @@ out:
return err;
}
-int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
+int ceph_pool_perm_check(struct inode *inode, int need)
{
- s64 pool;
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_string *pool_ns;
+ s64 pool;
int ret, flags;
if (ci->i_vino.snap != CEPH_NOSNAP) {
@@ -2003,7 +2003,7 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
return 0;
}
- if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
+ if (ceph_test_mount_opt(ceph_inode_to_client(inode),
NOPOOLPERM))
return 0;
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index bc90cf6ad7ed..270b769607a2 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -6,6 +6,9 @@
* Written by Milosz Tanski (milosz@adfin.com)
*/
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/fs_context.h>
#include "super.h"
#include "cache.h"
@@ -47,7 +50,7 @@ void ceph_fscache_unregister(void)
fscache_unregister_netfs(&ceph_cache_netfs);
}
-int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
+int ceph_fscache_register_fs(struct ceph_fs_client* fsc, struct fs_context *fc)
{
const struct ceph_fsid *fsid = &fsc->client->fsid;
const char *fscache_uniq = fsc->mount_options->fscache_uniq;
@@ -64,8 +67,8 @@ int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
if (uniq_len && memcmp(ent->uniquifier, fscache_uniq, uniq_len))
continue;
- pr_err("fscache cookie already registered for fsid %pU\n", fsid);
- pr_err(" use fsc=%%s mount option to specify a uniquifier\n");
+ errorfc(fc, "fscache cookie already registered for fsid %pU, use fsc=<uniquifier> option",
+ fsid);
err = -EBUSY;
goto out_unlock;
}
@@ -93,7 +96,7 @@ int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
list_add_tail(&ent->list, &ceph_fscache_list);
} else {
kfree(ent);
- pr_err("unable to register fscache cookie for fsid %pU\n",
+ errorfc(fc, "unable to register fscache cookie for fsid %pU",
fsid);
/* all other fs ignore this error */
}
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
index e486fac3434d..89dbdd1eb14a 100644
--- a/fs/ceph/cache.h
+++ b/fs/ceph/cache.h
@@ -16,7 +16,7 @@ extern struct fscache_netfs ceph_cache_netfs;
int ceph_fscache_register(void);
void ceph_fscache_unregister(void);
-int ceph_fscache_register_fs(struct ceph_fs_client* fsc);
+int ceph_fscache_register_fs(struct ceph_fs_client* fsc, struct fs_context *fc);
void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc);
void ceph_fscache_register_inode_cookie(struct inode *inode);
@@ -88,7 +88,8 @@ static inline void ceph_fscache_unregister(void)
{
}
-static inline int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
+static inline int ceph_fscache_register_fs(struct ceph_fs_client* fsc,
+ struct fs_context *fc)
{
return 0;
}
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index d98dcd976c80..28ae0c134700 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -458,37 +458,6 @@ struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
}
/*
- * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
- */
-static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
-{
- struct ceph_cap *cap;
- int mds = -1;
- struct rb_node *p;
-
- /* prefer mds with WR|BUFFER|EXCL caps */
- for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
- cap = rb_entry(p, struct ceph_cap, ci_node);
- mds = cap->mds;
- if (cap->issued & (CEPH_CAP_FILE_WR |
- CEPH_CAP_FILE_BUFFER |
- CEPH_CAP_FILE_EXCL))
- break;
- }
- return mds;
-}
-
-int ceph_get_cap_mds(struct inode *inode)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- int mds;
- spin_lock(&ci->i_ceph_lock);
- mds = __ceph_get_cap_mds(ceph_inode(inode));
- spin_unlock(&ci->i_ceph_lock);
- return mds;
-}
-
-/*
* Called under i_ceph_lock.
*/
static void __insert_cap_node(struct ceph_inode_info *ci,
@@ -628,7 +597,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
/*
* Add a capability under the given MDS session.
*
- * Caller should hold session snap_rwsem (read) and s_mutex.
+ * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock
*
* @fmode is the open file mode, if we are opening a file, otherwise
* it is < 0. (This is so we can atomically add the cap and add an
@@ -645,6 +614,9 @@ void ceph_add_cap(struct inode *inode,
struct ceph_cap *cap;
int mds = session->s_mds;
int actual_wanted;
+ u32 gen;
+
+ lockdep_assert_held(&ci->i_ceph_lock);
dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
session->s_mds, cap_id, ceph_cap_string(issued), seq);
@@ -656,6 +628,10 @@ void ceph_add_cap(struct inode *inode,
if (fmode >= 0)
wanted |= ceph_caps_for_mode(fmode);
+ spin_lock(&session->s_gen_ttl_lock);
+ gen = session->s_cap_gen;
+ spin_unlock(&session->s_gen_ttl_lock);
+
cap = __get_cap_for_mds(ci, mds);
if (!cap) {
cap = *new_cap;
@@ -681,7 +657,7 @@ void ceph_add_cap(struct inode *inode,
list_move_tail(&cap->session_caps, &session->s_caps);
spin_unlock(&session->s_cap_lock);
- if (cap->cap_gen < session->s_cap_gen)
+ if (cap->cap_gen < gen)
cap->issued = cap->implemented = CEPH_CAP_PIN;
/*
@@ -775,7 +751,7 @@ void ceph_add_cap(struct inode *inode,
cap->seq = seq;
cap->issue_seq = seq;
cap->mseq = mseq;
- cap->cap_gen = session->s_cap_gen;
+ cap->cap_gen = gen;
if (fmode >= 0)
__ceph_get_fmode(ci, fmode);
@@ -932,7 +908,8 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
ci_node);
if (!__cap_is_valid(cap))
continue;
- __touch_cap(cap);
+ if (cap->issued & mask)
+ __touch_cap(cap);
}
}
return 1;
@@ -1035,18 +1012,13 @@ static int __ceph_is_single_caps(struct ceph_inode_info *ci)
return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
}
-static int __ceph_is_any_caps(struct ceph_inode_info *ci)
-{
- return !RB_EMPTY_ROOT(&ci->i_caps);
-}
-
int ceph_is_any_caps(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int ret;
spin_lock(&ci->i_ceph_lock);
- ret = __ceph_is_any_caps(ci);
+ ret = __ceph_is_any_real_caps(ci);
spin_unlock(&ci->i_ceph_lock);
return ret;
@@ -1082,6 +1054,11 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
+ /* remove from inode's cap rbtree, and clear auth cap */
+ rb_erase(&cap->ci_node, &ci->i_caps);
+ if (ci->i_auth_cap == cap)
+ ci->i_auth_cap = NULL;
+
/* remove from session list */
spin_lock(&session->s_cap_lock);
if (session->s_cap_iterator == cap) {
@@ -1115,23 +1092,19 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
spin_unlock(&session->s_cap_lock);
- /* remove from inode list */
- rb_erase(&cap->ci_node, &ci->i_caps);
- if (ci->i_auth_cap == cap)
- ci->i_auth_cap = NULL;
-
if (removed)
ceph_put_cap(mdsc, cap);
- /* when reconnect denied, we remove session caps forcibly,
- * i_wr_ref can be non-zero. If there are ongoing write,
- * keep i_snap_realm.
- */
- if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm)
- drop_inode_snap_realm(ci);
+ if (!__ceph_is_any_real_caps(ci)) {
+ /* when reconnect denied, we remove session caps forcibly,
+ * i_wr_ref can be non-zero. If there are ongoing write,
+ * keep i_snap_realm.
+ */
+ if (ci->i_wr_ref == 0 && ci->i_snap_realm)
+ drop_inode_snap_realm(ci);
- if (!__ceph_is_any_real_caps(ci))
__cap_delay_cancel(mdsc, ci);
+ }
}
struct cap_msg_args {
@@ -1284,10 +1257,6 @@ void __ceph_remove_caps(struct ceph_inode_info *ci)
* Make note of max_size reported/requested from mds, revoked caps
* that have now been implemented.
*
- * Make half-hearted attempt ot to invalidate page cache if we are
- * dropping RDCACHE. Note that this will leave behind locked pages
- * that we'll then need to deal with elsewhere.
- *
* Return non-zero if delayed release, or we experienced an error
* such that the caller should requeue + retry later.
*
@@ -1301,6 +1270,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
+ struct ceph_buffer *old_blob = NULL;
struct cap_msg_args arg;
int held, revoking;
int wake = 0;
@@ -1365,7 +1335,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ci->i_requested_max_size = arg.max_size;
if (flushing & CEPH_CAP_XATTR_EXCL) {
- __ceph_build_xattrs_blob(ci);
+ old_blob = __ceph_build_xattrs_blob(ci);
arg.xattr_version = ci->i_xattrs.version;
arg.xattr_buf = ci->i_xattrs.blob;
} else {
@@ -1409,6 +1379,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
spin_unlock(&ci->i_ceph_lock);
+ ceph_buffer_put(old_blob);
+
ret = send_cap_msg(&arg);
if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode);
@@ -1743,11 +1715,11 @@ static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
* Add dirty inode to the flushing list. Assigned a seq number so we
* can wait for caps to flush without starving.
*
- * Called under i_ceph_lock.
+ * Called under i_ceph_lock. Returns the flush tid.
*/
-static int __mark_caps_flushing(struct inode *inode,
+static u64 __mark_caps_flushing(struct inode *inode,
struct ceph_mds_session *session, bool wake,
- u64 *flush_tid, u64 *oldest_flush_tid)
+ u64 *oldest_flush_tid)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
@@ -1786,8 +1758,7 @@ static int __mark_caps_flushing(struct inode *inode,
list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
- *flush_tid = cf->tid;
- return flushing;
+ return cf->tid;
}
/*
@@ -2025,11 +1996,6 @@ retry_locked:
}
ack:
- if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
- dout(" skipping %p I_NOFLUSH set\n", inode);
- continue;
- }
-
if (session && session != cap->session) {
dout("oops, wrong session %p mutex\n", session);
mutex_unlock(&session->s_mutex);
@@ -2077,9 +2043,9 @@ ack:
}
if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
- flushing = __mark_caps_flushing(inode, session, false,
- &flush_tid,
- &oldest_flush_tid);
+ flushing = ci->i_dirty_caps;
+ flush_tid = __mark_caps_flushing(inode, session, false,
+ &oldest_flush_tid);
} else {
flushing = 0;
flush_tid = 0;
@@ -2127,16 +2093,11 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
retry:
spin_lock(&ci->i_ceph_lock);
retry_locked:
- if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
- spin_unlock(&ci->i_ceph_lock);
- dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
- goto out;
- }
if (ci->i_dirty_caps && ci->i_auth_cap) {
struct ceph_cap *cap = ci->i_auth_cap;
int delayed;
- if (!session || session != cap->session) {
+ if (session != cap->session) {
spin_unlock(&ci->i_ceph_lock);
if (session)
mutex_unlock(&session->s_mutex);
@@ -2158,8 +2119,9 @@ retry_locked:
goto retry_locked;
}
- flushing = __mark_caps_flushing(inode, session, true,
- &flush_tid, &oldest_flush_tid);
+ flushing = ci->i_dirty_caps;
+ flush_tid = __mark_caps_flushing(inode, session, true,
+ &oldest_flush_tid);
/* __send_cap drops i_ceph_lock */
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
@@ -2258,35 +2220,45 @@ static int unsafe_request_wait(struct inode *inode)
int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
{
+ struct ceph_file_info *fi = file->private_data;
struct inode *inode = file->f_mapping->host;
struct ceph_inode_info *ci = ceph_inode(inode);
u64 flush_tid;
- int ret;
+ int ret, err;
int dirty;
dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
ret = file_write_and_wait_range(file, start, end);
- if (ret < 0)
- goto out;
-
if (datasync)
goto out;
dirty = try_flush_caps(inode, &flush_tid);
dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
- ret = unsafe_request_wait(inode);
+ err = unsafe_request_wait(inode);
/*
* only wait on non-file metadata writeback (the mds
* can recover size and mtime, so we don't need to
* wait for that)
*/
- if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
- ret = wait_event_interruptible(ci->i_cap_wq,
+ if (!err && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
+ err = wait_event_interruptible(ci->i_cap_wq,
caps_are_flushed(inode, flush_tid));
}
+
+ if (err < 0)
+ ret = err;
+
+ if (errseq_check(&ci->i_meta_err, READ_ONCE(fi->meta_err))) {
+ spin_lock(&file->f_lock);
+ err = errseq_check_and_advance(&ci->i_meta_err,
+ &fi->meta_err);
+ spin_unlock(&file->f_lock);
+ if (err < 0)
+ ret = err;
+ }
out:
dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
return ret;
@@ -2557,10 +2529,15 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got,
*
* FIXME: how does a 0 return differ from -EAGAIN?
*/
-static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
- loff_t endoff, bool nonblock, int *got)
+enum {
+ NON_BLOCKING = 1,
+ CHECK_FILELOCK = 2,
+};
+
+static int try_get_cap_refs(struct inode *inode, int need, int want,
+ loff_t endoff, int flags, int *got)
{
- struct inode *inode = &ci->vfs_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
int ret = 0;
int have, implemented;
@@ -2573,6 +2550,13 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
again:
spin_lock(&ci->i_ceph_lock);
+ if ((flags & CHECK_FILELOCK) &&
+ (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK)) {
+ dout("try_get_cap_refs %p error filelock\n", inode);
+ ret = -EIO;
+ goto out_unlock;
+ }
+
/* make sure file is actually open */
file_wanted = __ceph_caps_file_wanted(ci);
if ((file_wanted & need) != need) {
@@ -2634,7 +2618,7 @@ again:
* we can not call down_read() when
* task isn't in TASK_RUNNING state
*/
- if (nonblock) {
+ if (flags & NON_BLOCKING) {
ret = -EAGAIN;
goto out_unlock;
}
@@ -2728,18 +2712,19 @@ static void check_max_size(struct inode *inode, loff_t endoff)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
}
-int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
+int ceph_try_get_caps(struct inode *inode, int need, int want,
bool nonblock, int *got)
{
int ret;
BUG_ON(need & ~CEPH_CAP_FILE_RD);
BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
- ret = ceph_pool_perm_check(ci, need);
+ ret = ceph_pool_perm_check(inode, need);
if (ret < 0)
return ret;
- ret = try_get_cap_refs(ci, need, want, 0, nonblock, got);
+ ret = try_get_cap_refs(inode, need, want, 0,
+ (nonblock ? NON_BLOCKING : 0), got);
return ret == -EAGAIN ? 0 : ret;
}
@@ -2748,30 +2733,52 @@ int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
* due to a small max_size, make sure we check_max_size (and possibly
* ask the mds) so we don't get hung up indefinitely.
*/
-int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+int ceph_get_caps(struct file *filp, int need, int want,
loff_t endoff, int *got, struct page **pinned_page)
{
- int _got, ret;
+ struct ceph_file_info *fi = filp->private_data;
+ struct inode *inode = file_inode(filp);
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ int ret, _got, flags;
- ret = ceph_pool_perm_check(ci, need);
+ ret = ceph_pool_perm_check(inode, need);
if (ret < 0)
return ret;
+ if ((fi->fmode & CEPH_FILE_MODE_WR) &&
+ fi->filp_gen != READ_ONCE(fsc->filp_gen))
+ return -EBADF;
+
while (true) {
if (endoff > 0)
- check_max_size(&ci->vfs_inode, endoff);
+ check_max_size(inode, endoff);
+ flags = atomic_read(&fi->num_locks) ? CHECK_FILELOCK : 0;
_got = 0;
- ret = try_get_cap_refs(ci, need, want, endoff,
- false, &_got);
+ ret = try_get_cap_refs(inode, need, want, endoff,
+ flags, &_got);
if (ret == -EAGAIN)
continue;
if (!ret) {
+ struct ceph_mds_client *mdsc = fsc->mdsc;
+ struct cap_wait cw;
DEFINE_WAIT_FUNC(wait, woken_wake_function);
+
+ cw.ino = inode->i_ino;
+ cw.tgid = current->tgid;
+ cw.need = need;
+ cw.want = want;
+
+ spin_lock(&mdsc->caps_list_lock);
+ list_add(&cw.list, &mdsc->cap_wait_list);
+ spin_unlock(&mdsc->caps_list_lock);
+
add_wait_queue(&ci->i_cap_wq, &wait);
- while (!(ret = try_get_cap_refs(ci, need, want, endoff,
- true, &_got))) {
+ flags |= NON_BLOCKING;
+ while (!(ret = try_get_cap_refs(inode, need, want,
+ endoff, flags, &_got))) {
if (signal_pending(current)) {
ret = -ERESTARTSYS;
break;
@@ -2780,13 +2787,26 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
}
remove_wait_queue(&ci->i_cap_wq, &wait);
+
+ spin_lock(&mdsc->caps_list_lock);
+ list_del(&cw.list);
+ spin_unlock(&mdsc->caps_list_lock);
+
if (ret == -EAGAIN)
continue;
}
+
+ if ((fi->fmode & CEPH_FILE_MODE_WR) &&
+ fi->filp_gen != READ_ONCE(fsc->filp_gen)) {
+ if (ret >= 0 && _got)
+ ceph_put_cap_refs(ci, _got);
+ return -EBADF;
+ }
+
if (ret < 0) {
if (ret == -ESTALE) {
/* session was killed, try renew caps */
- ret = ceph_renew_caps(&ci->vfs_inode);
+ ret = ceph_renew_caps(inode);
if (ret == 0)
continue;
}
@@ -2795,9 +2815,9 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
- i_size_read(&ci->vfs_inode) > 0) {
+ i_size_read(inode) > 0) {
struct page *page =
- find_get_page(ci->vfs_inode.i_mapping, 0);
+ find_get_page(inode->i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
*pinned_page = page;
@@ -2816,7 +2836,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
* getattr request will bring inline data into
* page cache
*/
- ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+ ret = __ceph_do_getattr(inode, NULL,
CEPH_STAT_CAP_INLINE_DATA,
true);
if (ret < 0)
@@ -2922,7 +2942,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
ci->i_head_snapc = NULL;
}
/* see comment in __ceph_remove_cap() */
- if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
+ if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
drop_inode_snap_realm(ci);
}
spin_unlock(&ci->i_ceph_lock);
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 2eb88ed22993..fb7cabd98e7b 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -33,7 +33,7 @@ static int mdsmap_show(struct seq_file *s, void *p)
seq_printf(s, "max_mds %d\n", mdsmap->m_max_mds);
seq_printf(s, "session_timeout %d\n", mdsmap->m_session_timeout);
seq_printf(s, "session_autoclose %d\n", mdsmap->m_session_autoclose);
- for (i = 0; i < mdsmap->m_num_mds; i++) {
+ for (i = 0; i < mdsmap->possible_max_rank; i++) {
struct ceph_entity_addr *addr = &mdsmap->m_info[i].addr;
int state = mdsmap->m_info[i].state;
seq_printf(s, "\tmds%d\t%s\t(%s)\n", i,
@@ -139,6 +139,7 @@ static int caps_show(struct seq_file *s, void *p)
struct ceph_fs_client *fsc = s->private;
struct ceph_mds_client *mdsc = fsc->mdsc;
int total, avail, used, reserved, min, i;
+ struct cap_wait *cw;
ceph_reservation_status(fsc, &total, &avail, &used, &reserved, &min);
seq_printf(s, "total\t\t%d\n"
@@ -166,6 +167,18 @@ static int caps_show(struct seq_file *s, void *p)
}
mutex_unlock(&mdsc->mutex);
+ seq_printf(s, "\n\nWaiters:\n--------\n");
+ seq_printf(s, "tgid ino need want\n");
+ seq_printf(s, "-----------------------------------------------------\n");
+
+ spin_lock(&mdsc->caps_list_lock);
+ list_for_each_entry(cw, &mdsc->cap_wait_list, list) {
+ seq_printf(s, "%-13d0x%-17lx%-17s%-17s\n", cw->tgid, cw->ino,
+ ceph_cap_string(cw->need),
+ ceph_cap_string(cw->want));
+ }
+ spin_unlock(&mdsc->caps_list_lock);
+
return 0;
}
@@ -294,7 +307,6 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
{
- return 0;
}
void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 4ca0b8ff9a72..d0cd0aba5843 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -1186,7 +1186,7 @@ void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di)
struct dentry *dn = di->dentry;
struct ceph_mds_client *mdsc;
- dout("dentry_dir_lease_touch %p %p '%pd' (offset %lld)\n",
+ dout("dentry_dir_lease_touch %p %p '%pd' (offset 0x%llx)\n",
di, dn, dn, di->offset);
if (!list_empty(&di->lease_list)) {
@@ -1553,36 +1553,37 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
{
int valid = 0;
struct dentry *parent;
- struct inode *dir;
+ struct inode *dir, *inode;
if (flags & LOOKUP_RCU) {
parent = READ_ONCE(dentry->d_parent);
dir = d_inode_rcu(parent);
if (!dir)
return -ECHILD;
+ inode = d_inode_rcu(dentry);
} else {
parent = dget_parent(dentry);
dir = d_inode(parent);
+ inode = d_inode(dentry);
}
- dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
- dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
+ dout("d_revalidate %p '%pd' inode %p offset 0x%llx\n", dentry,
+ dentry, inode, ceph_dentry(dentry)->offset);
/* always trust cached snapped dentries, snapdir dentry */
if (ceph_snap(dir) != CEPH_NOSNAP) {
dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
- dentry, d_inode(dentry));
+ dentry, inode);
valid = 1;
- } else if (d_really_is_positive(dentry) &&
- ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
+ } else if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
valid = 1;
} else {
valid = dentry_lease_is_valid(dentry, flags);
if (valid == -ECHILD)
return valid;
if (valid || dir_lease_is_valid(dir, dentry)) {
- if (d_really_is_positive(dentry))
- valid = ceph_is_any_caps(d_inode(dentry));
+ if (inode)
+ valid = ceph_is_any_caps(inode);
else
valid = 1;
}
@@ -1808,6 +1809,7 @@ const struct file_operations ceph_dir_fops = {
.open = ceph_open,
.release = ceph_release,
.unlocked_ioctl = ceph_ioctl,
+ .compat_ioctl = compat_ptr_ioctl,
.fsync = ceph_fsync,
.lock = ceph_lock,
.flock = ceph_flock,
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 15ff1b09cfa2..b6bfa94332c3 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -35,7 +35,7 @@ struct ceph_nfs_snapfh {
static int ceph_encode_snapfh(struct inode *inode, u32 *rawfh, int *max_len,
struct inode *parent_inode)
{
- const static int snap_handle_length =
+ static const int snap_handle_length =
sizeof(struct ceph_nfs_snapfh) >> 2;
struct ceph_nfs_snapfh *sfh = (void *)rawfh;
u64 snapid = ceph_snap(inode);
@@ -85,9 +85,9 @@ out:
static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len,
struct inode *parent_inode)
{
- const static int handle_length =
+ static const int handle_length =
sizeof(struct ceph_nfs_fh) >> 2;
- const static int connected_handle_length =
+ static const int connected_handle_length =
sizeof(struct ceph_nfs_confh) >> 2;
int type;
@@ -458,33 +458,33 @@ static int __get_snap_name(struct dentry *parent, char *name,
if (err < 0)
goto out;
- rinfo = &req->r_reply_info;
- for (i = 0; i < rinfo->dir_nr; i++) {
- rde = rinfo->dir_entries + i;
- BUG_ON(!rde->inode.in);
- if (ceph_snap(inode) ==
- le64_to_cpu(rde->inode.in->snapid)) {
- memcpy(name, rde->name, rde->name_len);
- name[rde->name_len] = '\0';
- err = 0;
- goto out;
- }
- }
-
- if (rinfo->dir_end)
- break;
-
- BUG_ON(rinfo->dir_nr <= 0);
- rde = rinfo->dir_entries + (rinfo->dir_nr - 1);
- next_offset += rinfo->dir_nr;
- last_name = kstrndup(rde->name, rde->name_len, GFP_KERNEL);
- if (!last_name) {
- err = -ENOMEM;
- goto out;
- }
-
- ceph_mdsc_put_request(req);
- req = NULL;
+ rinfo = &req->r_reply_info;
+ for (i = 0; i < rinfo->dir_nr; i++) {
+ rde = rinfo->dir_entries + i;
+ BUG_ON(!rde->inode.in);
+ if (ceph_snap(inode) ==
+ le64_to_cpu(rde->inode.in->snapid)) {
+ memcpy(name, rde->name, rde->name_len);
+ name[rde->name_len] = '\0';
+ err = 0;
+ goto out;
+ }
+ }
+
+ if (rinfo->dir_end)
+ break;
+
+ BUG_ON(rinfo->dir_nr <= 0);
+ rde = rinfo->dir_entries + (rinfo->dir_nr - 1);
+ next_offset += rinfo->dir_nr;
+ last_name = kstrndup(rde->name, rde->name_len, GFP_KERNEL);
+ if (!last_name) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ ceph_mdsc_put_request(req);
+ req = NULL;
}
err = -ENOENT;
out:
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 685a03cc4b77..7e0190b1f821 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -15,6 +15,7 @@
#include "super.h"
#include "mds_client.h"
#include "cache.h"
+#include "io.h"
static __le32 ceph_flags_sys2wire(u32 flags)
{
@@ -201,6 +202,7 @@ out:
static int ceph_init_file_info(struct inode *inode, struct file *file,
int fmode, bool isdir)
{
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi;
dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
@@ -211,7 +213,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
struct ceph_dir_file_info *dfi =
kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
if (!dfi) {
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+ ceph_put_fmode(ci, fmode); /* clean up */
return -ENOMEM;
}
@@ -222,7 +224,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
} else {
fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
if (!fi) {
- ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+ ceph_put_fmode(ci, fmode); /* clean up */
return -ENOMEM;
}
@@ -232,6 +234,8 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
fi->fmode = fmode;
spin_lock_init(&fi->rw_contexts_lock);
INIT_LIST_HEAD(&fi->rw_contexts);
+ fi->meta_err = errseq_sample(&ci->i_meta_err);
+ fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
return 0;
}
@@ -458,6 +462,9 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
err = ceph_security_init_secctx(dentry, mode, &as_ctx);
if (err < 0)
goto out_ctx;
+ } else if (!d_in_lookup(dentry)) {
+ /* If it's not being looked up, it's negative */
+ return -ENOENT;
}
/* do the open */
@@ -695,7 +702,13 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
ceph_release_page_vector(pages, num_pages);
}
- if (ret <= 0 || off >= i_size || !more)
+ if (ret < 0) {
+ if (ret == -EBLACKLISTED)
+ fsc->blacklisted = true;
+ break;
+ }
+
+ if (off >= i_size || !more)
break;
}
@@ -740,6 +753,9 @@ static void ceph_aio_complete(struct inode *inode,
if (!atomic_dec_and_test(&aio_req->pending_reqs))
return;
+ if (aio_req->iocb->ki_flags & IOCB_DIRECT)
+ inode_dio_end(inode);
+
ret = aio_req->error;
if (!ret)
ret = aio_req->total_len;
@@ -921,7 +937,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
struct ceph_aio_request *aio_req = NULL;
int num_pages = 0;
int flags;
- int ret;
+ int ret = 0;
struct timespec64 mtime = current_time(inode);
size_t count = iov_iter_count(iter);
loff_t pos = iocb->ki_pos;
@@ -935,11 +951,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
(write ? "write" : "read"), file, pos, (unsigned)count,
snapc, snapc ? snapc->seq : 0);
- ret = filemap_write_and_wait_range(inode->i_mapping,
- pos, pos + count - 1);
- if (ret < 0)
- return ret;
-
if (write) {
int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
pos >> PAGE_SHIFT,
@@ -1083,6 +1094,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
CEPH_CAP_FILE_RD);
list_splice(&aio_req->osd_reqs, &osd_reqs);
+ inode_dio_begin(inode);
while (!list_empty(&osd_reqs)) {
req = list_first_entry(&osd_reqs,
struct ceph_osd_request,
@@ -1256,13 +1268,24 @@ again:
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
+ if (iocb->ki_flags & IOCB_DIRECT)
+ ceph_start_io_direct(inode);
+ else
+ ceph_start_io_read(inode);
+
if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
- if (ret < 0)
+ ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
+ &got, &pinned_page);
+ if (ret < 0) {
+ if (iocb->ki_flags & IOCB_DIRECT)
+ ceph_end_io_direct(inode);
+ else
+ ceph_end_io_read(inode);
return ret;
+ }
if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_flags & IOCB_DIRECT) ||
@@ -1293,6 +1316,7 @@ again:
ret = generic_file_read_iter(iocb, to);
ceph_del_rw_context(fi, &rw_ctx);
}
+
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
if (pinned_page) {
@@ -1300,6 +1324,12 @@ again:
pinned_page = NULL;
}
ceph_put_cap_refs(ci, got);
+
+ if (iocb->ki_flags & IOCB_DIRECT)
+ ceph_end_io_direct(inode);
+ else
+ ceph_end_io_read(inode);
+
if (retry_op > HAVE_RETRIED && ret >= 0) {
int statret;
struct page *page = NULL;
@@ -1388,6 +1418,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
struct ceph_cap_flush *prealloc_cf;
ssize_t count, written = 0;
int err, want, got;
+ bool direct_lock = false;
loff_t pos;
loff_t limit = max(i_size_read(inode), fsc->max_file_size);
@@ -1398,8 +1429,14 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (!prealloc_cf)
return -ENOMEM;
+ if ((iocb->ki_flags & (IOCB_DIRECT | IOCB_APPEND)) == IOCB_DIRECT)
+ direct_lock = true;
+
retry_snap:
- inode_lock(inode);
+ if (direct_lock)
+ ceph_start_io_direct(inode);
+ else
+ ceph_start_io_write(inode);
/* We can write back this queue in page reclaim */
current->backing_dev_info = inode_to_bdi(inode);
@@ -1457,7 +1494,7 @@ retry_snap:
else
want = CEPH_CAP_FILE_BUFFER;
got = 0;
- err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+ err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count,
&got, NULL);
if (err < 0)
goto out;
@@ -1470,7 +1507,6 @@ retry_snap:
(ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
struct ceph_snap_context *snapc;
struct iov_iter data;
- inode_unlock(inode);
spin_lock(&ci->i_ceph_lock);
if (__ceph_have_pending_cap_snap(ci)) {
@@ -1492,6 +1528,10 @@ retry_snap:
&prealloc_cf);
else
written = ceph_sync_write(iocb, &data, pos, snapc);
+ if (direct_lock)
+ ceph_end_io_direct(inode);
+ else
+ ceph_end_io_write(inode);
if (written > 0)
iov_iter_advance(from, written);
ceph_put_snap_context(snapc);
@@ -1506,7 +1546,7 @@ retry_snap:
written = generic_perform_write(file, from, pos);
if (likely(written >= 0))
iocb->ki_pos = pos + written;
- inode_unlock(inode);
+ ceph_end_io_write(inode);
}
if (written >= 0) {
@@ -1541,9 +1581,11 @@ retry_snap:
}
goto out_unlocked;
-
out:
- inode_unlock(inode);
+ if (direct_lock)
+ ceph_end_io_direct(inode);
+ else
+ ceph_end_io_write(inode);
out_unlocked:
ceph_free_cap_flush(prealloc_cf);
current->backing_dev_info = NULL;
@@ -1781,7 +1823,7 @@ static long ceph_fallocate(struct file *file, int mode,
else
want = CEPH_CAP_FILE_BUFFER;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
+ ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
if (ret < 0)
goto unlock;
@@ -1810,16 +1852,15 @@ unlock:
* src_ci. Two attempts are made to obtain both caps, and an error is return if
* this fails; zero is returned on success.
*/
-static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
- loff_t src_endoff, int *src_got,
- struct ceph_inode_info *dst_ci,
+static int get_rd_wr_caps(struct file *src_filp, int *src_got,
+ struct file *dst_filp,
loff_t dst_endoff, int *dst_got)
{
int ret = 0;
bool retrying = false;
retry_caps:
- ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+ ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
dst_endoff, dst_got, NULL);
if (ret < 0)
return ret;
@@ -1829,24 +1870,24 @@ retry_caps:
* we would risk a deadlock by using ceph_get_caps. Thus, we'll do some
* retry dance instead to try to get both capabilities.
*/
- ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+ ret = ceph_try_get_caps(file_inode(src_filp),
+ CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
false, src_got);
if (ret <= 0) {
/* Start by dropping dst_ci caps and getting src_ci caps */
- ceph_put_cap_refs(dst_ci, *dst_got);
+ ceph_put_cap_refs(ceph_inode(file_inode(dst_filp)), *dst_got);
if (retrying) {
if (!ret)
/* ceph_try_get_caps masks EAGAIN */
ret = -EAGAIN;
return ret;
}
- ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
- CEPH_CAP_FILE_SHARED, src_endoff,
- src_got, NULL);
+ ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD,
+ CEPH_CAP_FILE_SHARED, -1, src_got, NULL);
if (ret < 0)
return ret;
/*... drop src_ci caps too, and retry */
- ceph_put_cap_refs(src_ci, *src_got);
+ ceph_put_cap_refs(ceph_inode(file_inode(src_filp)), *src_got);
retrying = true;
goto retry_caps;
}
@@ -1904,6 +1945,7 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
struct ceph_inode_info *src_ci = ceph_inode(src_inode);
struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
struct ceph_cap_flush *prealloc_cf;
+ struct ceph_fs_client *src_fsc = ceph_inode_to_client(src_inode);
struct ceph_object_locator src_oloc, dst_oloc;
struct ceph_object_id src_oid, dst_oid;
loff_t endoff = 0, size;
@@ -1913,10 +1955,16 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
int src_got = 0, dst_got = 0, err, dirty;
bool do_final_copy = false;
- if (src_inode == dst_inode)
- return -EINVAL;
- if (src_inode->i_sb != dst_inode->i_sb)
- return -EXDEV;
+ if (src_inode->i_sb != dst_inode->i_sb) {
+ struct ceph_fs_client *dst_fsc = ceph_inode_to_client(dst_inode);
+
+ if (ceph_fsid_compare(&src_fsc->client->fsid,
+ &dst_fsc->client->fsid)) {
+ dout("Copying files across clusters: src: %pU dst: %pU\n",
+ &src_fsc->client->fsid, &dst_fsc->client->fsid);
+ return -EXDEV;
+ }
+ }
if (ceph_snap(dst_inode) != CEPH_NOSNAP)
return -EROFS;
@@ -1928,13 +1976,24 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
* efficient).
*/
- if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM))
+ if (ceph_test_mount_opt(src_fsc, NOCOPYFROM))
+ return -EOPNOTSUPP;
+
+ if (!src_fsc->have_copy_from2)
return -EOPNOTSUPP;
+ /*
+ * Striped file layouts require that we copy partial objects, but the
+ * OSD copy-from operation only supports full-object copies. Limit
+ * this to non-striped file layouts for now.
+ */
if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
- (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
- (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
+ (src_ci->i_layout.stripe_count != 1) ||
+ (dst_ci->i_layout.stripe_count != 1) ||
+ (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) {
+ dout("Invalid src/dst files layout\n");
return -EOPNOTSUPP;
+ }
if (len < src_ci->i_layout.object_size)
return -EOPNOTSUPP; /* no remote copy will be done */
@@ -1960,8 +2019,8 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
* clients may have dirty data in their caches. And OSDs know nothing
* about caps, so they can't safely do the remote object copies.
*/
- err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
- dst_ci, (dst_off + len), &dst_got);
+ err = get_rd_wr_caps(src_file, &src_got,
+ dst_file, (dst_off + len), &dst_got);
if (err < 0) {
dout("get_rd_wr_caps returned %d\n", err);
ret = -EOPNOTSUPP;
@@ -2018,9 +2077,8 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
goto out;
}
len -= ret;
- err = get_rd_wr_caps(src_ci, (src_off + len),
- &src_got, dst_ci,
- (dst_off + len), &dst_got);
+ err = get_rd_wr_caps(src_file, &src_got,
+ dst_file, (dst_off + len), &dst_got);
if (err < 0)
goto out;
err = is_file_size_ok(src_inode, dst_inode,
@@ -2044,15 +2102,21 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
dst_ci->i_vino.ino, dst_objnum);
/* Do an object remote copy */
err = ceph_osdc_copy_from(
- &ceph_inode_to_client(src_inode)->client->osdc,
+ &src_fsc->client->osdc,
src_ci->i_vino.snap, 0,
&src_oid, &src_oloc,
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
&dst_oid, &dst_oloc,
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
- CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
+ CEPH_OSD_OP_FLAG_FADVISE_DONTNEED,
+ dst_ci->i_truncate_seq, dst_ci->i_truncate_size,
+ CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ);
if (err) {
+ if (err == -EOPNOTSUPP) {
+ src_fsc->have_copy_from2 = false;
+ pr_notice("OSDs don't support copy-from2; disabling copy offload\n");
+ }
dout("ceph_osdc_copy_from returned %d\n", err);
if (!ret)
ret = err;
@@ -2138,7 +2202,7 @@ const struct file_operations ceph_file_fops = {
.splice_read = generic_file_splice_read,
.splice_write = iter_file_splice_write,
.unlocked_ioctl = ceph_ioctl,
- .compat_ioctl = ceph_ioctl,
+ .compat_ioctl = compat_ptr_ioctl,
.fallocate = ceph_fallocate,
.copy_file_range = ceph_copy_file_range,
};
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 791f84a13bb8..d01710a16a4a 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -55,11 +55,9 @@ struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
inode = iget5_locked(sb, t, ceph_ino_compare, ceph_set_ino_cb, &vino);
if (!inode)
return ERR_PTR(-ENOMEM);
- if (inode->i_state & I_NEW) {
+ if (inode->i_state & I_NEW)
dout("get_inode created new inode %p %llx.%llx ino %llx\n",
inode, ceph_vinop(inode), (u64)inode->i_ino);
- unlock_new_inode(inode);
- }
dout("get_inode on %lu=%llx.%llx got %p\n", inode->i_ino, vino.ino,
vino.snap, inode);
@@ -88,6 +86,10 @@ struct inode *ceph_get_snapdir(struct inode *parent)
inode->i_fop = &ceph_snapdir_fops;
ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
ci->i_rbytes = 0;
+
+ if (inode->i_state & I_NEW)
+ unlock_new_inode(inode);
+
return inode;
}
@@ -515,6 +517,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ceph_fscache_inode_init(ci);
+ ci->i_meta_err = 0;
+
return &ci->vfs_inode;
}
@@ -726,8 +730,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,
static int fill_inode(struct inode *inode, struct page *locked_page,
struct ceph_mds_reply_info_in *iinfo,
struct ceph_mds_reply_dirfrag *dirinfo,
- struct ceph_mds_session *session,
- unsigned long ttl_from, int cap_fmode,
+ struct ceph_mds_session *session, int cap_fmode,
struct ceph_cap_reservation *caps_reservation)
{
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
@@ -736,6 +739,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
int issued, new_issued, info_caps;
struct timespec64 mtime, atime, ctime;
struct ceph_buffer *xattr_blob = NULL;
+ struct ceph_buffer *old_blob = NULL;
struct ceph_string *pool_ns = NULL;
struct ceph_cap *new_cap = NULL;
int err = 0;
@@ -751,8 +755,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
info_caps = le32_to_cpu(info->cap.caps);
/* prealloc new cap struct */
- if (info_caps && ceph_snap(inode) == CEPH_NOSNAP)
+ if (info_caps && ceph_snap(inode) == CEPH_NOSNAP) {
new_cap = ceph_get_cap(mdsc, caps_reservation);
+ if (!new_cap)
+ return -ENOMEM;
+ }
/*
* prealloc xattr data, if it looks like we'll need it. only
@@ -800,7 +807,12 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
/* update inode */
inode->i_rdev = le32_to_cpu(info->rdev);
- inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+ /* directories have fl_stripe_unit set to zero */
+ if (le32_to_cpu(info->layout.fl_stripe_unit))
+ inode->i_blkbits =
+ fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+ else
+ inode->i_blkbits = CEPH_BLOCK_SHIFT;
__ceph_update_quota(ci, iinfo->max_bytes, iinfo->max_files);
@@ -881,7 +893,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) &&
le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
if (ci->i_xattrs.blob)
- ceph_buffer_put(ci->i_xattrs.blob);
+ old_blob = ci->i_xattrs.blob;
ci->i_xattrs.blob = xattr_blob;
if (xattr_blob)
memcpy(ci->i_xattrs.blob->vec.iov_base,
@@ -1022,8 +1034,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
out:
if (new_cap)
ceph_put_cap(mdsc, new_cap);
- if (xattr_blob)
- ceph_buffer_put(xattr_blob);
+ ceph_buffer_put(old_blob);
+ ceph_buffer_put(xattr_blob);
ceph_put_string(pool_ns);
return err;
}
@@ -1229,7 +1241,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
if (dir) {
err = fill_inode(dir, NULL,
&rinfo->diri, rinfo->dirfrag,
- session, req->r_request_started, -1,
+ session, -1,
&req->r_caps_reservation);
if (err < 0)
goto done;
@@ -1294,18 +1306,22 @@ retry_lookup:
err = PTR_ERR(in);
goto done;
}
- req->r_target_inode = in;
err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
- session, req->r_request_started,
+ session,
(!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
- rinfo->head->result == 0) ? req->r_fmode : -1,
+ rinfo->head->result == 0) ? req->r_fmode : -1,
&req->r_caps_reservation);
if (err < 0) {
pr_err("fill_inode badness %p %llx.%llx\n",
in, ceph_vinop(in));
+ if (in->i_state & I_NEW)
+ discard_new_inode(in);
goto done;
}
+ req->r_target_inode = in;
+ if (in->i_state & I_NEW)
+ unlock_new_inode(in);
}
/*
@@ -1426,6 +1442,7 @@ retry_lookup:
dout(" final dn %p\n", dn);
} else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
req->r_op == CEPH_MDS_OP_MKSNAP) &&
+ test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
struct inode *dir = req->r_parent;
@@ -1484,12 +1501,18 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
continue;
}
rc = fill_inode(in, NULL, &rde->inode, NULL, session,
- req->r_request_started, -1,
- &req->r_caps_reservation);
+ -1, &req->r_caps_reservation);
if (rc < 0) {
pr_err("fill_inode badness on %p got %d\n", in, rc);
err = rc;
+ if (in->i_state & I_NEW) {
+ ihold(in);
+ discard_new_inode(in);
+ }
+ } else if (in->i_state & I_NEW) {
+ unlock_new_inode(in);
}
+
/* avoid calling iput_final() in mds dispatch threads */
ceph_async_iput(in);
}
@@ -1685,19 +1708,24 @@ retry_lookup:
}
ret = fill_inode(in, NULL, &rde->inode, NULL, session,
- req->r_request_started, -1,
- &req->r_caps_reservation);
+ -1, &req->r_caps_reservation);
if (ret < 0) {
pr_err("fill_inode badness on %p\n", in);
if (d_really_is_negative(dn)) {
/* avoid calling iput_final() in mds
* dispatch threads */
+ if (in->i_state & I_NEW) {
+ ihold(in);
+ discard_new_inode(in);
+ }
ceph_async_iput(in);
}
d_drop(dn);
err = ret;
goto next_item;
}
+ if (in->i_state & I_NEW)
+ unlock_new_inode(in);
if (d_really_is_negative(dn)) {
if (ceph_security_xattr_deadlock(in)) {
@@ -1981,7 +2009,7 @@ static const struct inode_operations ceph_symlink_iops = {
int __ceph_setattr(struct inode *inode, struct iattr *attr)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- const unsigned int ia_valid = attr->ia_valid;
+ unsigned int ia_valid = attr->ia_valid;
struct ceph_mds_request *req;
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_cap_flush *prealloc_cf;
@@ -2086,6 +2114,26 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
}
}
+ if (ia_valid & ATTR_SIZE) {
+ dout("setattr %p size %lld -> %lld\n", inode,
+ inode->i_size, attr->ia_size);
+ if ((issued & CEPH_CAP_FILE_EXCL) &&
+ attr->ia_size > inode->i_size) {
+ i_size_write(inode, attr->ia_size);
+ inode->i_blocks = calc_inode_blocks(attr->ia_size);
+ ci->i_reported_size = attr->ia_size;
+ dirtied |= CEPH_CAP_FILE_EXCL;
+ ia_valid |= ATTR_MTIME;
+ } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
+ attr->ia_size != inode->i_size) {
+ req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
+ req->r_args.setattr.old_size =
+ cpu_to_le64(inode->i_size);
+ mask |= CEPH_SETATTR_SIZE;
+ release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+ CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
+ }
+ }
if (ia_valid & ATTR_MTIME) {
dout("setattr %p mtime %lld.%ld -> %lld.%ld\n", inode,
inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
@@ -2108,25 +2156,6 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
}
}
- if (ia_valid & ATTR_SIZE) {
- dout("setattr %p size %lld -> %lld\n", inode,
- inode->i_size, attr->ia_size);
- if ((issued & CEPH_CAP_FILE_EXCL) &&
- attr->ia_size > inode->i_size) {
- i_size_write(inode, attr->ia_size);
- inode->i_blocks = calc_inode_blocks(attr->ia_size);
- ci->i_reported_size = attr->ia_size;
- dirtied |= CEPH_CAP_FILE_EXCL;
- } else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
- attr->ia_size != inode->i_size) {
- req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
- req->r_args.setattr.old_size =
- cpu_to_le64(inode->i_size);
- mask |= CEPH_SETATTR_SIZE;
- release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
- CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
- }
- }
/* these do nothing */
if (ia_valid & ATTR_CTIME) {
diff --git a/fs/ceph/io.c b/fs/ceph/io.c
new file mode 100644
index 000000000000..97602ea92ff4
--- /dev/null
+++ b/fs/ceph/io.c
@@ -0,0 +1,163 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (c) 2016 Trond Myklebust
+ * Copyright (c) 2019 Jeff Layton
+ *
+ * I/O and data path helper functionality.
+ *
+ * Heavily borrowed from equivalent code in fs/nfs/io.c
+ */
+
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/rwsem.h>
+#include <linux/fs.h>
+
+#include "super.h"
+#include "io.h"
+
+/* Call with exclusively locked inode->i_rwsem */
+static void ceph_block_o_direct(struct ceph_inode_info *ci, struct inode *inode)
+{
+ lockdep_assert_held_write(&inode->i_rwsem);
+
+ if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT) {
+ spin_lock(&ci->i_ceph_lock);
+ ci->i_ceph_flags &= ~CEPH_I_ODIRECT;
+ spin_unlock(&ci->i_ceph_lock);
+ inode_dio_wait(inode);
+ }
+}
+
+/**
+ * ceph_start_io_read - declare the file is being used for buffered reads
+ * @inode: file inode
+ *
+ * Declare that a buffered read operation is about to start, and ensure
+ * that we block all direct I/O.
+ * On exit, the function ensures that the CEPH_I_ODIRECT flag is unset,
+ * and holds a shared lock on inode->i_rwsem to ensure that the flag
+ * cannot be changed.
+ * In practice, this means that buffered read operations are allowed to
+ * execute in parallel, thanks to the shared lock, whereas direct I/O
+ * operations need to wait to grab an exclusive lock in order to set
+ * CEPH_I_ODIRECT.
+ * Note that buffered writes and truncates both take a write lock on
+ * inode->i_rwsem, meaning that those are serialised w.r.t. the reads.
+ */
+void
+ceph_start_io_read(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+
+ /* Be an optimist! */
+ down_read(&inode->i_rwsem);
+ if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT))
+ return;
+ up_read(&inode->i_rwsem);
+ /* Slow path.... */
+ down_write(&inode->i_rwsem);
+ ceph_block_o_direct(ci, inode);
+ downgrade_write(&inode->i_rwsem);
+}
+
+/**
+ * ceph_end_io_read - declare that the buffered read operation is done
+ * @inode: file inode
+ *
+ * Declare that a buffered read operation is done, and release the shared
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_read(struct inode *inode)
+{
+ up_read(&inode->i_rwsem);
+}
+
+/**
+ * ceph_start_io_write - declare the file is being used for buffered writes
+ * @inode: file inode
+ *
+ * Declare that a buffered write operation is about to start, and ensure
+ * that we block all direct I/O.
+ */
+void
+ceph_start_io_write(struct inode *inode)
+{
+ down_write(&inode->i_rwsem);
+ ceph_block_o_direct(ceph_inode(inode), inode);
+}
+
+/**
+ * ceph_end_io_write - declare that the buffered write operation is done
+ * @inode: file inode
+ *
+ * Declare that a buffered write operation is done, and release the
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_write(struct inode *inode)
+{
+ up_write(&inode->i_rwsem);
+}
+
+/* Call with exclusively locked inode->i_rwsem */
+static void ceph_block_buffered(struct ceph_inode_info *ci, struct inode *inode)
+{
+ lockdep_assert_held_write(&inode->i_rwsem);
+
+ if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT)) {
+ spin_lock(&ci->i_ceph_lock);
+ ci->i_ceph_flags |= CEPH_I_ODIRECT;
+ spin_unlock(&ci->i_ceph_lock);
+ /* FIXME: unmap_mapping_range? */
+ filemap_write_and_wait(inode->i_mapping);
+ }
+}
+
+/**
+ * ceph_end_io_direct - declare the file is being used for direct i/o
+ * @inode: file inode
+ *
+ * Declare that a direct I/O operation is about to start, and ensure
+ * that we block all buffered I/O.
+ * On exit, the function ensures that the CEPH_I_ODIRECT flag is set,
+ * and holds a shared lock on inode->i_rwsem to ensure that the flag
+ * cannot be changed.
+ * In practice, this means that direct I/O operations are allowed to
+ * execute in parallel, thanks to the shared lock, whereas buffered I/O
+ * operations need to wait to grab an exclusive lock in order to clear
+ * CEPH_I_ODIRECT.
+ * Note that buffered writes and truncates both take a write lock on
+ * inode->i_rwsem, meaning that those are serialised w.r.t. O_DIRECT.
+ */
+void
+ceph_start_io_direct(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+
+ /* Be an optimist! */
+ down_read(&inode->i_rwsem);
+ if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT)
+ return;
+ up_read(&inode->i_rwsem);
+ /* Slow path.... */
+ down_write(&inode->i_rwsem);
+ ceph_block_buffered(ci, inode);
+ downgrade_write(&inode->i_rwsem);
+}
+
+/**
+ * ceph_end_io_direct - declare that the direct i/o operation is done
+ * @inode: file inode
+ *
+ * Declare that a direct I/O operation is done, and release the shared
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_direct(struct inode *inode)
+{
+ up_read(&inode->i_rwsem);
+}
diff --git a/fs/ceph/io.h b/fs/ceph/io.h
new file mode 100644
index 000000000000..fa594cd77348
--- /dev/null
+++ b/fs/ceph/io.h
@@ -0,0 +1,12 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_IO_H
+#define _FS_CEPH_IO_H
+
+void ceph_start_io_read(struct inode *inode);
+void ceph_end_io_read(struct inode *inode);
+void ceph_start_io_write(struct inode *inode);
+void ceph_end_io_write(struct inode *inode);
+void ceph_start_io_direct(struct inode *inode);
+void ceph_end_io_direct(struct inode *inode);
+
+#endif /* FS_CEPH_IO_H */
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index ac9b53b89365..544e9e85b120 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -32,14 +32,18 @@ void __init ceph_flock_init(void)
static void ceph_fl_copy_lock(struct file_lock *dst, struct file_lock *src)
{
- struct inode *inode = file_inode(src->fl_file);
+ struct ceph_file_info *fi = dst->fl_file->private_data;
+ struct inode *inode = file_inode(dst->fl_file);
atomic_inc(&ceph_inode(inode)->i_filelock_ref);
+ atomic_inc(&fi->num_locks);
}
static void ceph_fl_release_lock(struct file_lock *fl)
{
+ struct ceph_file_info *fi = fl->fl_file->private_data;
struct inode *inode = file_inode(fl->fl_file);
struct ceph_inode_info *ci = ceph_inode(inode);
+ atomic_dec(&fi->num_locks);
if (atomic_dec_and_test(&ci->i_filelock_ref)) {
/* clear error when all locks are released */
spin_lock(&ci->i_ceph_lock);
@@ -73,7 +77,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
* window. Caller function will decrease the counter.
*/
fl->fl_ops = &ceph_fl_lock_ops;
- atomic_inc(&ceph_inode(inode)->i_filelock_ref);
+ fl->fl_ops->fl_copy_lock(fl, NULL);
}
if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
@@ -111,8 +115,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
req->r_wait_for_completion = ceph_lock_wait_for_completion;
err = ceph_mdsc_do_request(mdsc, inode, req);
-
- if (operation == CEPH_MDS_OP_GETFILELOCK) {
+ if (!err && operation == CEPH_MDS_OP_GETFILELOCK) {
fl->fl_pid = -le64_to_cpu(req->r_reply_info.filelock_reply->pid);
if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type)
fl->fl_type = F_RDLCK;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 920e9f048bd8..bbbbddf71326 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -9,6 +9,7 @@
#include <linux/debugfs.h>
#include <linux/seq_file.h>
#include <linux/ratelimit.h>
+#include <linux/bits.h>
#include "super.h"
#include "mds_client.h"
@@ -384,8 +385,8 @@ static int parse_reply_info_readdir(void **p, void *end,
}
done:
- if (*p != end)
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
bad:
@@ -406,12 +407,10 @@ static int parse_reply_info_filelock(void **p, void *end,
goto bad;
info->filelock_reply = *p;
- *p += sizeof(*info->filelock_reply);
- if (unlikely(*p != end))
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
-
bad:
return -EIO;
}
@@ -425,18 +424,21 @@ static int parse_reply_info_create(void **p, void *end,
{
if (features == (u64)-1 ||
(features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
+ /* Malformed reply? */
if (*p == end) {
info->has_create_ino = false;
} else {
info->has_create_ino = true;
- info->ino = ceph_decode_64(p);
+ ceph_decode_64_safe(p, end, info->ino, bad);
}
+ } else {
+ if (*p != end)
+ goto bad;
}
- if (unlikely(*p != end))
- goto bad;
+ /* Skip over any unrecognized fields */
+ *p = end;
return 0;
-
bad:
return -EIO;
}
@@ -529,6 +531,7 @@ const char *ceph_session_state_name(int s)
case CEPH_MDS_SESSION_OPEN: return "open";
case CEPH_MDS_SESSION_HUNG: return "hung";
case CEPH_MDS_SESSION_CLOSING: return "closing";
+ case CEPH_MDS_SESSION_CLOSED: return "closed";
case CEPH_MDS_SESSION_RESTARTING: return "restarting";
case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
case CEPH_MDS_SESSION_REJECTED: return "rejected";
@@ -536,7 +539,7 @@ const char *ceph_session_state_name(int s)
}
}
-static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
+struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
{
if (refcount_inc_not_zero(&s->s_ref)) {
dout("mdsc get_session %p %d -> %d\n", s,
@@ -567,7 +570,7 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
{
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
return NULL;
- return get_session(mdsc->sessions[mds]);
+ return ceph_get_mds_session(mdsc->sessions[mds]);
}
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
@@ -596,7 +599,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
{
struct ceph_mds_session *s;
- if (mds >= mdsc->mdsmap->m_num_mds)
+ if (mds >= mdsc->mdsmap->possible_max_rank)
return ERR_PTR(-EINVAL);
s = kzalloc(sizeof(*s), GFP_NOFS);
@@ -639,7 +642,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s->s_renew_seq = 0;
INIT_LIST_HEAD(&s->s_caps);
s->s_nr_caps = 0;
- s->s_trim_caps = 0;
refcount_set(&s->s_ref, 1);
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
@@ -674,7 +676,6 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
dout("__unregister_session mds%d %p\n", s->s_mds, s);
BUG_ON(mdsc->sessions[s->s_mds] != s);
mdsc->sessions[s->s_mds] = NULL;
- s->s_state = 0;
ceph_con_close(&s->s_con);
ceph_put_mds_session(s);
atomic_dec(&mdsc->num_sessions);
@@ -708,8 +709,10 @@ void ceph_mdsc_release_request(struct kref *kref)
/* avoid calling iput_final() in mds dispatch threads */
ceph_async_iput(req->r_inode);
}
- if (req->r_parent)
+ if (req->r_parent) {
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
+ ceph_async_iput(req->r_parent);
+ }
ceph_async_iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
@@ -876,7 +879,8 @@ static struct inode *get_nonsnap_parent(struct dentry *dentry)
* Called under mdsc->mutex.
*/
static int __choose_mds(struct ceph_mds_client *mdsc,
- struct ceph_mds_request *req)
+ struct ceph_mds_request *req,
+ bool *random)
{
struct inode *inode;
struct ceph_inode_info *ci;
@@ -886,6 +890,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
u32 hash = req->r_direct_hash;
bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
+ if (random)
+ *random = false;
+
/*
* is there a specific mds we should try? ignore hint if we have
* no session and the mds is not up (active or recovering).
@@ -893,7 +900,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
if (req->r_resend_mds >= 0 &&
(__have_session(mdsc, req->r_resend_mds) ||
ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
- dout("choose_mds using resend_mds mds%d\n",
+ dout("%s using resend_mds mds%d\n", __func__,
req->r_resend_mds);
return req->r_resend_mds;
}
@@ -911,7 +918,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
rcu_read_lock();
inode = get_nonsnap_parent(req->r_dentry);
rcu_read_unlock();
- dout("__choose_mds using snapdir's parent %p\n", inode);
+ dout("%s using snapdir's parent %p\n", __func__, inode);
}
} else if (req->r_dentry) {
/* ignore race with rename; old or new d_parent is okay */
@@ -931,7 +938,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
/* direct snapped/virtual snapdir requests
* based on parent dir inode */
inode = get_nonsnap_parent(parent);
- dout("__choose_mds using nonsnap parent %p\n", inode);
+ dout("%s using nonsnap parent %p\n", __func__, inode);
} else {
/* dentry target */
inode = d_inode(req->r_dentry);
@@ -947,8 +954,8 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
rcu_read_unlock();
}
- dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
- (int)hash, mode);
+ dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
+ hash, mode);
if (!inode)
goto random;
ci = ceph_inode(inode);
@@ -966,30 +973,33 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
get_random_bytes(&r, 1);
r %= frag.ndist;
mds = frag.dist[r];
- dout("choose_mds %p %llx.%llx "
- "frag %u mds%d (%d/%d)\n",
- inode, ceph_vinop(inode),
- frag.frag, mds,
- (int)r, frag.ndist);
+ dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
+ __func__, inode, ceph_vinop(inode),
+ frag.frag, mds, (int)r, frag.ndist);
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
- CEPH_MDS_STATE_ACTIVE)
+ CEPH_MDS_STATE_ACTIVE &&
+ !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
goto out;
}
/* since this file/dir wasn't known to be
* replicated, then we want to look for the
* authoritative mds. */
- mode = USE_AUTH_MDS;
if (frag.mds >= 0) {
/* choose auth mds */
mds = frag.mds;
- dout("choose_mds %p %llx.%llx "
- "frag %u mds%d (auth)\n",
- inode, ceph_vinop(inode), frag.frag, mds);
+ dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
+ __func__, inode, ceph_vinop(inode),
+ frag.frag, mds);
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
- CEPH_MDS_STATE_ACTIVE)
- goto out;
+ CEPH_MDS_STATE_ACTIVE) {
+ if (mode == USE_ANY_MDS &&
+ !ceph_mdsmap_is_laggy(mdsc->mdsmap,
+ mds))
+ goto out;
+ }
}
+ mode = USE_AUTH_MDS;
}
}
@@ -1005,7 +1015,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
goto random;
}
mds = cap->session->s_mds;
- dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
+ dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
inode, ceph_vinop(inode), mds,
cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock);
@@ -1016,8 +1026,11 @@ out:
return mds;
random:
+ if (random)
+ *random = true;
+
mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
- dout("choose_mds chose random mds%d\n", mds);
+ dout("%s chose random mds%d\n", __func__, mds);
return mds;
}
@@ -1043,20 +1056,21 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
return msg;
}
+static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
+#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
static void encode_supported_features(void **p, void *end)
{
- static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
- static const size_t count = ARRAY_SIZE(bits);
+ static const size_t count = ARRAY_SIZE(feature_bits);
if (count > 0) {
size_t i;
- size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8;
+ size_t size = FEATURE_BYTES(count);
BUG_ON(*p + 4 + size > end);
ceph_encode_32(p, size);
memset(*p, 0, size);
for (i = 0; i < count; i++)
- ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8);
+ ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
*p += size;
} else {
BUG_ON(*p + 4 > end);
@@ -1077,6 +1091,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
int metadata_key_count = 0;
struct ceph_options *opt = mdsc->fsc->client->options;
struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
+ size_t size, count;
void *p, *end;
const char* metadata[][2] = {
@@ -1094,8 +1109,13 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
strlen(metadata[i][1]);
metadata_key_count++;
}
+
/* supported feature */
- extra_bytes += 4 + 8;
+ size = 0;
+ count = ARRAY_SIZE(feature_bits);
+ if (count > 0)
+ size = FEATURE_BYTES(count);
+ extra_bytes += 4 + size;
/* Allocate the message */
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
@@ -1115,7 +1135,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
* Serialize client metadata into waiting buffer space, using
* the format that userspace expects for map<string, string>
*
- * ClientSession messages with metadata are v2
+ * ClientSession messages with metadata are v3
*/
msg->hdr.version = cpu_to_le16(3);
msg->hdr.compat_version = cpu_to_le16(1);
@@ -1217,7 +1237,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *ts;
int i, mds = session->s_mds;
- if (mds >= mdsc->mdsmap->m_num_mds)
+ if (mds >= mdsc->mdsmap->possible_max_rank)
return;
mi = &mdsc->mdsmap->m_info[mds];
@@ -1270,6 +1290,7 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
{
struct ceph_mds_request *req;
struct rb_node *p;
+ struct ceph_inode_info *ci;
dout("cleanup_session_requests mds%d\n", session->s_mds);
mutex_lock(&mdsc->mutex);
@@ -1278,6 +1299,16 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
struct ceph_mds_request, r_unsafe_item);
pr_warn_ratelimited(" dropping unsafe request %llu\n",
req->r_tid);
+ if (req->r_target_inode) {
+ /* dropping unsafe change of inode's attributes */
+ ci = ceph_inode(req->r_target_inode);
+ errseq_set(&ci->i_meta_err, -EIO);
+ }
+ if (req->r_unsafe_dir) {
+ /* dropping unsafe directory operation */
+ ci = ceph_inode(req->r_unsafe_dir);
+ errseq_set(&ci->i_meta_err, -EIO);
+ }
__unregister_request(mdsc, req);
}
/* zero r_attempts, so kick_requests() will re-send requests */
@@ -1370,7 +1401,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
struct ceph_inode_info *ci = ceph_inode(inode);
LIST_HEAD(to_remove);
- bool drop = false;
+ bool dirty_dropped = false;
bool invalidate = false;
dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1383,9 +1414,12 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_cap_flush *cf;
struct ceph_mds_client *mdsc = fsc->mdsc;
- if (ci->i_wrbuffer_ref > 0 &&
- READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
- invalidate = true;
+ if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+ if (inode->i_data.nrpages > 0)
+ invalidate = true;
+ if (ci->i_wrbuffer_ref > 0)
+ mapping_set_error(&inode->i_data, -EIO);
+ }
while (!list_empty(&ci->i_cap_flush_list)) {
cf = list_first_entry(&ci->i_cap_flush_list,
@@ -1405,7 +1439,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
- drop = true;
+ dirty_dropped = true;
}
if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited(
@@ -1415,10 +1449,22 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
- drop = true;
+ dirty_dropped = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
+ if (dirty_dropped) {
+ errseq_set(&ci->i_meta_err, -EIO);
+
+ if (ci->i_wrbuffer_ref_head == 0 &&
+ ci->i_wr_ref == 0 &&
+ ci->i_dirty_caps == 0 &&
+ ci->i_flushing_caps == 0) {
+ ceph_put_snap_context(ci->i_head_snapc);
+ ci->i_head_snapc = NULL;
+ }
+ }
+
if (atomic_read(&ci->i_filelock_ref) > 0) {
/* make further file lock syscall return -EIO */
ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
@@ -1430,15 +1476,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
ci->i_prealloc_cap_flush = NULL;
}
-
- if (drop &&
- ci->i_wrbuffer_ref_head == 0 &&
- ci->i_wr_ref == 0 &&
- ci->i_dirty_caps == 0 &&
- ci->i_flushing_caps == 0) {
- ceph_put_snap_context(ci->i_head_snapc);
- ci->i_head_snapc = NULL;
- }
}
spin_unlock(&ci->i_ceph_lock);
while (!list_empty(&to_remove)) {
@@ -1452,7 +1489,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
wake_up_all(&ci->i_cap_wq);
if (invalidate)
ceph_queue_invalidate(inode);
- if (drop)
+ if (dirty_dropped)
iput(inode);
return 0;
}
@@ -1705,11 +1742,11 @@ out:
*/
static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
{
- struct ceph_mds_session *session = arg;
+ int *remaining = arg;
struct ceph_inode_info *ci = ceph_inode(inode);
int used, wanted, oissued, mine;
- if (session->s_trim_caps <= 0)
+ if (*remaining <= 0)
return -1;
spin_lock(&ci->i_ceph_lock);
@@ -1746,7 +1783,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
if (oissued) {
/* we aren't the only cap.. just remove us */
__ceph_remove_cap(cap, true);
- session->s_trim_caps--;
+ (*remaining)--;
} else {
struct dentry *dentry;
/* try dropping referring dentries */
@@ -1758,7 +1795,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
d_prune_aliases(inode);
count = atomic_read(&inode->i_count);
if (count == 1)
- session->s_trim_caps--;
+ (*remaining)--;
dout("trim_caps_cb %p cap %p pruned, count now %d\n",
inode, cap, count);
} else {
@@ -1784,12 +1821,12 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
dout("trim_caps mds%d start: %d / %d, trim %d\n",
session->s_mds, session->s_nr_caps, max_caps, trim_caps);
if (trim_caps > 0) {
- session->s_trim_caps = trim_caps;
- ceph_iterate_session_caps(session, trim_caps_cb, session);
+ int remaining = trim_caps;
+
+ ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
session->s_mds, session->s_nr_caps, max_caps,
- trim_caps - session->s_trim_caps);
- session->s_trim_caps = 0;
+ trim_caps - remaining);
}
ceph_flush_cap_releases(mdsc, session);
@@ -1948,7 +1985,7 @@ void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
if (mdsc->stopping)
return;
- get_session(session);
+ ceph_get_mds_session(session);
if (queue_work(mdsc->fsc->cap_wq,
&session->s_cap_release_work)) {
dout("cap release work queued\n");
@@ -1998,7 +2035,7 @@ void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
if (!nr)
return;
val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
- if (!(val % CEPH_CAPS_PER_RELEASE)) {
+ if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
atomic_set(&mdsc->cap_reclaim_pending, 0);
ceph_queue_cap_reclaim_work(mdsc);
}
@@ -2015,12 +2052,13 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
size_t size = sizeof(struct ceph_mds_reply_dir_entry);
- int order, num_entries;
+ unsigned int num_entries;
+ int order;
spin_lock(&ci->i_ceph_lock);
num_entries = ci->i_files + ci->i_subdirs;
spin_unlock(&ci->i_ceph_lock);
- num_entries = max(num_entries, 1);
+ num_entries = max(num_entries, 1U);
num_entries = min(num_entries, opt->max_readdir);
order = get_order(size * num_entries);
@@ -2052,7 +2090,6 @@ struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
{
struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
- struct timespec64 ts;
if (!req)
return ERR_PTR(-ENOMEM);
@@ -2071,8 +2108,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
- ktime_get_coarse_real_ts64(&ts);
- req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran);
+ ktime_get_coarse_real_ts64(&req->r_stamp);
req->r_op = op;
req->r_direct_mode = mode;
@@ -2165,13 +2201,17 @@ retry:
}
base = ceph_ino(d_inode(temp));
rcu_read_unlock();
- if (pos < 0 || read_seqretry(&rename_lock, seq)) {
- pr_err("build_path did not end path lookup where "
- "expected, pos is %d\n", pos);
- /* presumably this is only possible if racing with a
- rename of one of the parent directories (we can not
- lock the dentries above us to prevent this, but
- retrying should be harmless) */
+
+ if (read_seqretry(&rename_lock, seq))
+ goto retry;
+
+ if (pos < 0) {
+ /*
+ * A rename didn't occur, but somehow we didn't end up where
+ * we thought we would. Throw a warning and try again.
+ */
+ pr_warn("build_path did not end path lookup where "
+ "expected, pos is %d\n", pos);
goto retry;
}
@@ -2328,6 +2368,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
head->op = cpu_to_le32(req->r_op);
head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
+ head->ino = 0;
head->args = req->r_args;
ceph_encode_filepath(&p, end, ino1, path1);
@@ -2493,6 +2534,26 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
}
/*
+ * called under mdsc->mutex
+ */
+static int __send_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ struct ceph_mds_request *req,
+ bool drop_cap_releases)
+{
+ int err;
+
+ err = __prepare_send_request(mdsc, req, session->s_mds,
+ drop_cap_releases);
+ if (!err) {
+ ceph_msg_get(req->r_request);
+ ceph_con_send(&session->s_con, req->r_request);
+ }
+
+ return err;
+}
+
+/*
* send request, or put it on the appropriate wait list.
*/
static void __do_request(struct ceph_mds_client *mdsc,
@@ -2501,6 +2562,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session = NULL;
int mds = -1;
int err = 0;
+ bool random;
if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
@@ -2533,15 +2595,14 @@ static void __do_request(struct ceph_mds_client *mdsc,
if (!(mdsc->fsc->mount_options->flags &
CEPH_MOUNT_OPT_MOUNTWAIT) &&
!ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
- err = -ENOENT;
- pr_info("probably no mds server is up\n");
+ err = -EHOSTUNREACH;
goto finish;
}
}
put_request_session(req);
- mds = __choose_mds(mdsc, req);
+ mds = __choose_mds(mdsc, req, &random);
if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
dout("do_request no mds or not active, waiting for map\n");
@@ -2558,7 +2619,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
goto finish;
}
}
- req->r_session = get_session(session);
+ req->r_session = ceph_get_mds_session(session);
dout("do_request mds%d session %p state %s\n", mds, session,
ceph_session_state_name(session->s_state));
@@ -2569,8 +2630,12 @@ static void __do_request(struct ceph_mds_client *mdsc,
goto out_session;
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
- session->s_state == CEPH_MDS_SESSION_CLOSING)
+ session->s_state == CEPH_MDS_SESSION_CLOSING) {
__open_session(mdsc, session);
+ /* retry the same mds later */
+ if (random)
+ req->r_resend_mds = mds;
+ }
list_add(&req->r_wait, &session->s_waiting);
goto out_session;
}
@@ -2581,11 +2646,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
if (req->r_request_started == 0) /* note request start time */
req->r_request_started = jiffies;
- err = __prepare_send_request(mdsc, req, mds, false);
- if (!err) {
- ceph_msg_get(req->r_request);
- ceph_con_send(&session->s_con, req->r_request);
- }
+ err = __send_request(mdsc, session, req, false);
out_session:
ceph_put_mds_session(session);
@@ -2653,8 +2714,10 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
- if (req->r_parent)
+ if (req->r_parent) {
ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
+ ihold(req->r_parent);
+ }
if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
@@ -2836,7 +2899,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_unlock(&mdsc->mutex);
goto out;
} else {
- int mds = __choose_mds(mdsc, req);
+ int mds = __choose_mds(mdsc, req, NULL);
if (mds >= 0 && mds != req->r_session->s_mds) {
dout("but auth changed, so resending\n");
__do_request(mdsc, req);
@@ -2852,6 +2915,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
__unregister_request(mdsc, req);
+ /* last request during umount? */
+ if (mdsc->stopping && !__get_oldest_req(mdsc))
+ complete_all(&mdsc->safe_umount_waiters);
+
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
/*
* We already handled the unsafe response, now do the
@@ -2862,9 +2929,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
*/
dout("got safe reply %llu, mds%d\n", tid, mds);
- /* last unsafe request during umount? */
- if (mdsc->stopping && !__get_oldest_req(mdsc))
- complete_all(&mdsc->safe_umount_waiters);
mutex_unlock(&mdsc->mutex);
goto out;
}
@@ -3015,18 +3079,23 @@ bad:
pr_err("mdsc_handle_forward decode error err=%d\n", err);
}
-static int __decode_and_drop_session_metadata(void **p, void *end)
+static int __decode_session_metadata(void **p, void *end,
+ bool *blacklisted)
{
/* map<string,string> */
u32 n;
+ bool err_str;
ceph_decode_32_safe(p, end, n, bad);
while (n-- > 0) {
u32 len;
ceph_decode_32_safe(p, end, len, bad);
ceph_decode_need(p, end, len, bad);
+ err_str = !strncmp(*p, "error_string", len);
*p += len;
ceph_decode_32_safe(p, end, len, bad);
ceph_decode_need(p, end, len, bad);
+ if (err_str && strnstr(*p, "blacklisted", len))
+ *blacklisted = true;
*p += len;
}
return 0;
@@ -3050,6 +3119,7 @@ static void handle_session(struct ceph_mds_session *session,
u64 seq;
unsigned long features = 0;
int wake = 0;
+ bool blacklisted = false;
/* decode */
ceph_decode_need(&p, end, sizeof(*h), bad);
@@ -3062,7 +3132,7 @@ static void handle_session(struct ceph_mds_session *session,
if (msg_version >= 3) {
u32 len;
/* version >= 2, metadata */
- if (__decode_and_drop_session_metadata(&p, end) < 0)
+ if (__decode_session_metadata(&p, end, &blacklisted) < 0)
goto bad;
/* version >= 3, feature bits */
ceph_decode_32_safe(&p, end, len, bad);
@@ -3073,7 +3143,7 @@ static void handle_session(struct ceph_mds_session *session,
mutex_lock(&mdsc->mutex);
if (op == CEPH_SESSION_CLOSE) {
- get_session(session);
+ ceph_get_mds_session(session);
__unregister_session(mdsc, session);
}
/* FIXME: this ttl calculation is generous */
@@ -3111,6 +3181,7 @@ static void handle_session(struct ceph_mds_session *session,
case CEPH_SESSION_CLOSE:
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
pr_info("mds%d reconnect denied\n", session->s_mds);
+ session->s_state = CEPH_MDS_SESSION_CLOSED;
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
wake = 2; /* for good measure */
@@ -3149,6 +3220,8 @@ static void handle_session(struct ceph_mds_session *session,
session->s_state = CEPH_MDS_SESSION_REJECTED;
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
+ if (blacklisted)
+ mdsc->fsc->blacklisted = true;
wake = 2; /* for good measure */
break;
@@ -3176,7 +3249,6 @@ bad:
return;
}
-
/*
* called under session->mutex.
*/
@@ -3185,18 +3257,12 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
{
struct ceph_mds_request *req, *nreq;
struct rb_node *p;
- int err;
dout("replay_unsafe_requests mds%d\n", session->s_mds);
mutex_lock(&mdsc->mutex);
- list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
- err = __prepare_send_request(mdsc, req, session->s_mds, true);
- if (!err) {
- ceph_msg_get(req->r_request);
- ceph_con_send(&session->s_con, req->r_request);
- }
- }
+ list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
+ __send_request(mdsc, session, req, true);
/*
* also re-send old requests when MDS enters reconnect stage. So that MDS
@@ -3211,14 +3277,8 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
if (req->r_attempts == 0)
continue; /* only old requests */
if (req->r_session &&
- req->r_session->s_mds == session->s_mds) {
- err = __prepare_send_request(mdsc, req,
- session->s_mds, true);
- if (!err) {
- ceph_msg_get(req->r_request);
- ceph_con_send(&session->s_con, req->r_request);
- }
- }
+ req->r_session->s_mds == session->s_mds)
+ __send_request(mdsc, session, req, true);
}
mutex_unlock(&mdsc->mutex);
}
@@ -3729,7 +3789,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
dout("check_new_map new %u old %u\n",
newmap->m_epoch, oldmap->m_epoch);
- for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
+ for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
if (!mdsc->sessions[i])
continue;
s = mdsc->sessions[i];
@@ -3743,9 +3803,9 @@ static void check_new_map(struct ceph_mds_client *mdsc,
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
ceph_session_state_name(s->s_state));
- if (i >= newmap->m_num_mds) {
+ if (i >= newmap->possible_max_rank) {
/* force close session for stopped mds */
- get_session(s);
+ ceph_get_mds_session(s);
__unregister_session(mdsc, s);
__wake_requests(mdsc, &s->s_waiting);
mutex_unlock(&mdsc->mutex);
@@ -3800,7 +3860,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
}
}
- for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
+ for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
s = mdsc->sessions[i];
if (!s)
continue;
@@ -3998,7 +4058,27 @@ static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
mutex_unlock(&mdsc->mutex);
}
+static void maybe_recover_session(struct ceph_mds_client *mdsc)
+{
+ struct ceph_fs_client *fsc = mdsc->fsc;
+ if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
+ return;
+
+ if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
+ return;
+
+ if (!READ_ONCE(fsc->blacklisted))
+ return;
+
+ if (fsc->last_auto_reconnect &&
+ time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
+ return;
+
+ pr_info("auto reconnect after blacklisted\n");
+ fsc->last_auto_reconnect = jiffies;
+ ceph_force_reconnect(fsc->sb);
+}
/*
* delayed work -- periodically trim expired leases, renew caps with mds
@@ -4044,7 +4124,9 @@ static void delayed_work(struct work_struct *work)
pr_info("mds%d hung\n", s->s_mds);
}
}
- if (s->s_state < CEPH_MDS_SESSION_OPEN) {
+ if (s->s_state == CEPH_MDS_SESSION_NEW ||
+ s->s_state == CEPH_MDS_SESSION_RESTARTING ||
+ s->s_state == CEPH_MDS_SESSION_REJECTED) {
/* this mds is failed or recovering, just wait */
ceph_put_mds_session(s);
continue;
@@ -4072,6 +4154,8 @@ static void delayed_work(struct work_struct *work)
ceph_trim_snapid_map(mdsc);
+ maybe_recover_session(mdsc);
+
schedule_delayed(mdsc);
}
@@ -4114,6 +4198,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
mdsc->last_renew_caps = jiffies;
INIT_LIST_HEAD(&mdsc->cap_delay_list);
+ INIT_LIST_HEAD(&mdsc->cap_wait_list);
spin_lock_init(&mdsc->cap_delay_lock);
INIT_LIST_HEAD(&mdsc->snap_flush_list);
spin_lock_init(&mdsc->snap_flush_lock);
@@ -4321,7 +4406,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++) {
if (mdsc->sessions[i]) {
- session = get_session(mdsc->sessions[i]);
+ session = ceph_get_mds_session(mdsc->sessions[i]);
__unregister_session(mdsc, session);
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
@@ -4355,7 +4440,12 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
session = __ceph_lookup_mds_session(mdsc, mds);
if (!session)
continue;
+
+ if (session->s_state == CEPH_MDS_SESSION_REJECTED)
+ __unregister_session(mdsc, session);
+ __wake_requests(mdsc, &session->s_waiting);
mutex_unlock(&mdsc->mutex);
+
mutex_lock(&session->s_mutex);
__close_session(mdsc, session);
if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
@@ -4364,6 +4454,7 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
}
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
+
mutex_lock(&mdsc->mutex);
kick_requests(mdsc, mds);
}
@@ -4543,11 +4634,8 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
- if (get_session(s)) {
- dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
+ if (ceph_get_mds_session(s))
return con;
- }
- dout("mdsc con_get %p FAIL\n", s);
return NULL;
}
@@ -4555,7 +4643,6 @@ static void con_put(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
- dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
ceph_put_mds_session(s);
}
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index f7c8603484fe..27a7446e10d3 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -17,22 +17,31 @@
#include <linux/ceph/auth.h>
/* The first 8 bits are reserved for old ceph releases */
-#define CEPHFS_FEATURE_MIMIC 8
-#define CEPHFS_FEATURE_REPLY_ENCODING 9
-#define CEPHFS_FEATURE_RECLAIM_CLIENT 10
-#define CEPHFS_FEATURE_LAZY_CAP_WANTED 11
-#define CEPHFS_FEATURE_MULTI_RECONNECT 12
+enum ceph_feature_type {
+ CEPHFS_FEATURE_MIMIC = 8,
+ CEPHFS_FEATURE_REPLY_ENCODING,
+ CEPHFS_FEATURE_RECLAIM_CLIENT,
+ CEPHFS_FEATURE_LAZY_CAP_WANTED,
+ CEPHFS_FEATURE_MULTI_RECONNECT,
+
+ CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_MULTI_RECONNECT,
+};
-#define CEPHFS_FEATURES_CLIENT_SUPPORTED { \
+/*
+ * This will always have the highest feature bit value
+ * as the last element of the array.
+ */
+#define CEPHFS_FEATURES_CLIENT_SUPPORTED { \
0, 1, 2, 3, 4, 5, 6, 7, \
CEPHFS_FEATURE_MIMIC, \
CEPHFS_FEATURE_REPLY_ENCODING, \
CEPHFS_FEATURE_LAZY_CAP_WANTED, \
CEPHFS_FEATURE_MULTI_RECONNECT, \
+ \
+ CEPHFS_FEATURE_MAX, \
}
#define CEPHFS_FEATURES_CLIENT_REQUIRED {}
-
/*
* Some lock dependencies:
*
@@ -148,10 +157,11 @@ enum {
CEPH_MDS_SESSION_OPENING = 2,
CEPH_MDS_SESSION_OPEN = 3,
CEPH_MDS_SESSION_HUNG = 4,
- CEPH_MDS_SESSION_CLOSING = 5,
- CEPH_MDS_SESSION_RESTARTING = 6,
- CEPH_MDS_SESSION_RECONNECTING = 7,
- CEPH_MDS_SESSION_REJECTED = 8,
+ CEPH_MDS_SESSION_RESTARTING = 5,
+ CEPH_MDS_SESSION_RECONNECTING = 6,
+ CEPH_MDS_SESSION_CLOSING = 7,
+ CEPH_MDS_SESSION_CLOSED = 8,
+ CEPH_MDS_SESSION_REJECTED = 9,
};
struct ceph_mds_session {
@@ -174,9 +184,10 @@ struct ceph_mds_session {
/* protected by s_cap_lock */
spinlock_t s_cap_lock;
+ refcount_t s_ref;
struct list_head s_caps; /* all caps issued by this session */
struct ceph_cap *s_cap_iterator;
- int s_nr_caps, s_trim_caps;
+ int s_nr_caps;
int s_num_cap_releases;
int s_cap_reconnect;
int s_readonly;
@@ -188,7 +199,6 @@ struct ceph_mds_session {
unsigned long s_renew_requested; /* last time we sent a renew req */
u64 s_renew_seq;
- refcount_t s_ref;
struct list_head s_waiting; /* waiting requests */
struct list_head s_unsafe; /* unsafe requests */
};
@@ -224,6 +234,7 @@ struct ceph_mds_request {
struct rb_node r_node;
struct ceph_mds_client *r_mdsc;
+ struct kref r_kref;
int r_op; /* mds op code */
/* operation on what? */
@@ -294,7 +305,6 @@ struct ceph_mds_request {
int r_resend_mds; /* mds to resend to next, if any*/
u32 r_sent_on_mseq; /* cap mseq request was sent at*/
- struct kref r_kref;
struct list_head r_wait;
struct completion r_completion;
struct completion r_safe_completion;
@@ -340,6 +350,14 @@ struct ceph_quotarealm_inode {
struct inode *inode;
};
+struct cap_wait {
+ struct list_head list;
+ unsigned long ino;
+ pid_t tgid;
+ int need;
+ int want;
+};
+
/*
* mds client state
*/
@@ -416,6 +434,7 @@ struct ceph_mds_client {
spinlock_t caps_list_lock;
struct list_head caps_list; /* unused (reserved or
unreserved) */
+ struct list_head cap_wait_list;
int caps_total_count; /* total caps allocated */
int caps_use_count; /* in use */
int caps_use_max; /* max used caps */
@@ -442,15 +461,10 @@ extern const char *ceph_mds_op_name(int op);
extern struct ceph_mds_session *
__ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
-static inline struct ceph_mds_session *
-ceph_get_mds_session(struct ceph_mds_session *s)
-{
- refcount_inc(&s->s_ref);
- return s;
-}
-
extern const char *ceph_session_state_name(int s);
+extern struct ceph_mds_session *
+ceph_get_mds_session(struct ceph_mds_session *s);
extern void ceph_put_mds_session(struct ceph_mds_session *s);
extern int ceph_send_msg_mds(struct ceph_mds_client *mdsc,
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index ce2d00da5096..889627817e52 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -13,35 +13,47 @@
#include "super.h"
+#define CEPH_MDS_IS_READY(i, ignore_laggy) \
+ (m->m_info[i].state > 0 && ignore_laggy ? true : !m->m_info[i].laggy)
-/*
- * choose a random mds that is "up" (i.e. has a state > 0), or -1.
- */
-int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
+static int __mdsmap_get_random_mds(struct ceph_mdsmap *m, bool ignore_laggy)
{
int n = 0;
- int i;
-
- /* special case for one mds */
- if (1 == m->m_num_mds && m->m_info[0].state > 0)
- return 0;
+ int i, j;
/* count */
- for (i = 0; i < m->m_num_mds; i++)
- if (m->m_info[i].state > 0)
+ for (i = 0; i < m->possible_max_rank; i++)
+ if (CEPH_MDS_IS_READY(i, ignore_laggy))
n++;
if (n == 0)
return -1;
/* pick */
n = prandom_u32() % n;
- for (i = 0; n > 0; i++, n--)
- while (m->m_info[i].state <= 0)
- i++;
+ for (j = 0, i = 0; i < m->possible_max_rank; i++) {
+ if (CEPH_MDS_IS_READY(i, ignore_laggy))
+ j++;
+ if (j > n)
+ break;
+ }
return i;
}
+/*
+ * choose a random mds that is "up" (i.e. has a state > 0), or -1.
+ */
+int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
+{
+ int mds;
+
+ mds = __mdsmap_get_random_mds(m, false);
+ if (mds == m->possible_max_rank || mds == -1)
+ mds = __mdsmap_get_random_mds(m, true);
+
+ return mds == m->possible_max_rank ? -1 : mds;
+}
+
#define __decode_and_drop_type(p, end, type, bad) \
do { \
if (*p + sizeof(type) > end) \
@@ -135,14 +147,29 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_session_autoclose = ceph_decode_32(p);
m->m_max_file_size = ceph_decode_64(p);
m->m_max_mds = ceph_decode_32(p);
- m->m_num_mds = m->m_max_mds;
- m->m_info = kcalloc(m->m_num_mds, sizeof(*m->m_info), GFP_NOFS);
+ /*
+ * pick out the active nodes as the m_num_active_mds, the
+ * m_num_active_mds maybe larger than m_max_mds when decreasing
+ * the max_mds in cluster side, in other case it should less
+ * than or equal to m_max_mds.
+ */
+ m->m_num_active_mds = n = ceph_decode_32(p);
+
+ /*
+ * the possible max rank, it maybe larger than the m_num_active_mds,
+ * for example if the mds_max == 2 in the cluster, when the MDS(0)
+ * was laggy and being replaced by a new MDS, we will temporarily
+ * receive a new mds map with n_num_mds == 1 and the active MDS(1),
+ * and the mds rank >= m_num_active_mds.
+ */
+ m->possible_max_rank = max(m->m_num_active_mds, m->m_max_mds);
+
+ m->m_info = kcalloc(m->possible_max_rank, sizeof(*m->m_info), GFP_NOFS);
if (!m->m_info)
goto nomem;
/* pick out active nodes from mds_info (state > 0) */
- n = ceph_decode_32(p);
for (i = 0; i < n; i++) {
u64 global_id;
u32 namelen;
@@ -155,6 +182,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
void *pexport_targets = NULL;
struct ceph_timespec laggy_since;
struct ceph_mds_info *info;
+ bool laggy;
ceph_decode_need(p, end, sizeof(u64) + 1, bad);
global_id = ceph_decode_64(p);
@@ -187,6 +215,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
if (err)
goto corrupt;
ceph_decode_copy(p, &laggy_since, sizeof(laggy_since));
+ laggy = laggy_since.tv_sec != 0 || laggy_since.tv_nsec != 0;
*p += sizeof(u32);
ceph_decode_32_safe(p, end, namelen, bad);
*p += namelen;
@@ -204,31 +233,28 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
*p = info_end;
}
- dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s\n",
+ dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s%s\n",
i+1, n, global_id, mds, inc,
ceph_pr_addr(&addr),
- ceph_mds_state_name(state));
+ ceph_mds_state_name(state),
+ laggy ? "(laggy)" : "");
- if (mds < 0 || state <= 0)
+ if (mds < 0 || mds >= m->possible_max_rank) {
+ pr_warn("mdsmap_decode got incorrect mds(%d)\n", mds);
continue;
+ }
- if (mds >= m->m_num_mds) {
- int new_num = max(mds + 1, m->m_num_mds * 2);
- void *new_m_info = krealloc(m->m_info,
- new_num * sizeof(*m->m_info),
- GFP_NOFS | __GFP_ZERO);
- if (!new_m_info)
- goto nomem;
- m->m_info = new_m_info;
- m->m_num_mds = new_num;
+ if (state <= 0) {
+ pr_warn("mdsmap_decode got incorrect state(%s)\n",
+ ceph_mds_state_name(state));
+ continue;
}
info = &m->m_info[mds];
info->global_id = global_id;
info->state = state;
info->addr = addr;
- info->laggy = (laggy_since.tv_sec != 0 ||
- laggy_since.tv_nsec != 0);
+ info->laggy = laggy;
info->num_export_targets = num_export_targets;
if (num_export_targets) {
info->export_targets = kcalloc(num_export_targets,
@@ -242,14 +268,6 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
info->export_targets = NULL;
}
}
- if (m->m_num_mds > m->m_max_mds) {
- /* find max up mds */
- for (i = m->m_num_mds; i >= m->m_max_mds; i--) {
- if (i == 0 || m->m_info[i-1].state > 0)
- break;
- }
- m->m_num_mds = i;
- }
/* pg_pools */
ceph_decode_32_safe(p, end, n, bad);
@@ -291,14 +309,14 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
for (i = 0; i < n; i++) {
s32 mds = ceph_decode_32(p);
- if (mds >= 0 && mds < m->m_num_mds) {
+ if (mds >= 0 && mds < m->possible_max_rank) {
if (m->m_info[mds].laggy)
num_laggy++;
}
}
m->m_num_laggy = num_laggy;
- if (n > m->m_num_mds) {
+ if (n > m->possible_max_rank) {
void *new_m_info = krealloc(m->m_info,
n * sizeof(*m->m_info),
GFP_NOFS | __GFP_ZERO);
@@ -306,7 +324,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
goto nomem;
m->m_info = new_m_info;
}
- m->m_num_mds = n;
+ m->possible_max_rank = n;
}
/* inc */
@@ -352,6 +370,8 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
m->m_damaged = false;
}
bad_ext:
+ dout("mdsmap_decode m_enabled: %d, m_damaged: %d, m_num_laggy: %d\n",
+ !!m->m_enabled, !!m->m_damaged, m->m_num_laggy);
*p = end;
dout("mdsmap_decode success epoch %u\n", m->m_epoch);
return m;
@@ -375,7 +395,7 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
{
int i;
- for (i = 0; i < m->m_num_mds; i++)
+ for (i = 0; i < m->possible_max_rank; i++)
kfree(m->m_info[i].export_targets);
kfree(m->m_info);
kfree(m->m_data_pg_pools);
@@ -389,9 +409,9 @@ bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m)
return false;
if (m->m_damaged)
return false;
- if (m->m_num_laggy > 0)
+ if (m->m_num_laggy == m->m_num_active_mds)
return false;
- for (i = 0; i < m->m_num_mds; i++) {
+ for (i = 0; i < m->possible_max_rank; i++) {
if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE)
nr_active++;
}
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4c6494eb02b5..ccfcc66aaf44 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -465,6 +465,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap;
struct ceph_snap_context *old_snapc, *new_snapc;
+ struct ceph_buffer *old_blob = NULL;
int used, dirty;
capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
@@ -541,7 +542,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
capsnap->gid = inode->i_gid;
if (dirty & CEPH_CAP_XATTR_EXCL) {
- __ceph_build_xattrs_blob(ci);
+ old_blob = __ceph_build_xattrs_blob(ci);
capsnap->xattr_blob =
ceph_buffer_get(ci->i_xattrs.blob);
capsnap->xattr_version = ci->i_xattrs.version;
@@ -584,6 +585,7 @@ update_snapc:
}
spin_unlock(&ci->i_ceph_lock);
+ ceph_buffer_put(old_blob);
kfree(capsnap);
ceph_put_snap_context(old_snapc);
}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ab4868c7308e..c7f150686a53 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -9,7 +9,8 @@
#include <linux/in6.h>
#include <linux/module.h>
#include <linux/mount.h>
-#include <linux/parser.h>
+#include <linux/fs_context.h>
+#include <linux/fs_parser.h>
#include <linux/sched.h>
#include <linux/seq_file.h>
#include <linux/slab.h>
@@ -106,7 +107,6 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
return 0;
}
-
static int ceph_sync_fs(struct super_block *sb, int wait)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
@@ -138,258 +138,327 @@ enum {
Opt_readdir_max_entries,
Opt_readdir_max_bytes,
Opt_congestion_kb,
- Opt_last_int,
/* int args above */
Opt_snapdirname,
Opt_mds_namespace,
- Opt_fscache_uniq,
- Opt_last_string,
+ Opt_recover_session,
+ Opt_source,
/* string args above */
Opt_dirstat,
- Opt_nodirstat,
Opt_rbytes,
- Opt_norbytes,
Opt_asyncreaddir,
- Opt_noasyncreaddir,
Opt_dcache,
- Opt_nodcache,
Opt_ino32,
- Opt_noino32,
Opt_fscache,
- Opt_nofscache,
Opt_poolperm,
- Opt_nopoolperm,
Opt_require_active_mds,
- Opt_norequire_active_mds,
-#ifdef CONFIG_CEPH_FS_POSIX_ACL
Opt_acl,
-#endif
- Opt_noacl,
Opt_quotadf,
- Opt_noquotadf,
Opt_copyfrom,
- Opt_nocopyfrom,
};
-static match_table_t fsopt_tokens = {
- {Opt_wsize, "wsize=%d"},
- {Opt_rsize, "rsize=%d"},
- {Opt_rasize, "rasize=%d"},
- {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
- {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
- {Opt_caps_max, "caps_max=%d"},
- {Opt_readdir_max_entries, "readdir_max_entries=%d"},
- {Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
- {Opt_congestion_kb, "write_congestion_kb=%d"},
- /* int args above */
- {Opt_snapdirname, "snapdirname=%s"},
- {Opt_mds_namespace, "mds_namespace=%s"},
- {Opt_fscache_uniq, "fsc=%s"},
- /* string args above */
- {Opt_dirstat, "dirstat"},
- {Opt_nodirstat, "nodirstat"},
- {Opt_rbytes, "rbytes"},
- {Opt_norbytes, "norbytes"},
- {Opt_asyncreaddir, "asyncreaddir"},
- {Opt_noasyncreaddir, "noasyncreaddir"},
- {Opt_dcache, "dcache"},
- {Opt_nodcache, "nodcache"},
- {Opt_ino32, "ino32"},
- {Opt_noino32, "noino32"},
- {Opt_fscache, "fsc"},
- {Opt_nofscache, "nofsc"},
- {Opt_poolperm, "poolperm"},
- {Opt_nopoolperm, "nopoolperm"},
- {Opt_require_active_mds, "require_active_mds"},
- {Opt_norequire_active_mds, "norequire_active_mds"},
-#ifdef CONFIG_CEPH_FS_POSIX_ACL
- {Opt_acl, "acl"},
-#endif
- {Opt_noacl, "noacl"},
- {Opt_quotadf, "quotadf"},
- {Opt_noquotadf, "noquotadf"},
- {Opt_copyfrom, "copyfrom"},
- {Opt_nocopyfrom, "nocopyfrom"},
- {-1, NULL}
+enum ceph_recover_session_mode {
+ ceph_recover_session_no,
+ ceph_recover_session_clean
};
-static int parse_fsopt_token(char *c, void *private)
+static const struct constant_table ceph_param_recover[] = {
+ { "no", ceph_recover_session_no },
+ { "clean", ceph_recover_session_clean },
+ {}
+};
+
+static const struct fs_parameter_spec ceph_mount_parameters[] = {
+ fsparam_flag_no ("acl", Opt_acl),
+ fsparam_flag_no ("asyncreaddir", Opt_asyncreaddir),
+ fsparam_s32 ("caps_max", Opt_caps_max),
+ fsparam_u32 ("caps_wanted_delay_max", Opt_caps_wanted_delay_max),
+ fsparam_u32 ("caps_wanted_delay_min", Opt_caps_wanted_delay_min),
+ fsparam_u32 ("write_congestion_kb", Opt_congestion_kb),
+ fsparam_flag_no ("copyfrom", Opt_copyfrom),
+ fsparam_flag_no ("dcache", Opt_dcache),
+ fsparam_flag_no ("dirstat", Opt_dirstat),
+ fsparam_flag_no ("fsc", Opt_fscache), // fsc|nofsc
+ fsparam_string ("fsc", Opt_fscache), // fsc=...
+ fsparam_flag_no ("ino32", Opt_ino32),
+ fsparam_string ("mds_namespace", Opt_mds_namespace),
+ fsparam_flag_no ("poolperm", Opt_poolperm),
+ fsparam_flag_no ("quotadf", Opt_quotadf),
+ fsparam_u32 ("rasize", Opt_rasize),
+ fsparam_flag_no ("rbytes", Opt_rbytes),
+ fsparam_u32 ("readdir_max_bytes", Opt_readdir_max_bytes),
+ fsparam_u32 ("readdir_max_entries", Opt_readdir_max_entries),
+ fsparam_enum ("recover_session", Opt_recover_session, ceph_param_recover),
+ fsparam_flag_no ("require_active_mds", Opt_require_active_mds),
+ fsparam_u32 ("rsize", Opt_rsize),
+ fsparam_string ("snapdirname", Opt_snapdirname),
+ fsparam_string ("source", Opt_source),
+ fsparam_u32 ("wsize", Opt_wsize),
+ {}
+};
+
+struct ceph_parse_opts_ctx {
+ struct ceph_options *copts;
+ struct ceph_mount_options *opts;
+};
+
+/*
+ * Remove adjacent slashes and then the trailing slash, unless it is
+ * the only remaining character.
+ *
+ * E.g. "//dir1////dir2///" --> "/dir1/dir2", "///" --> "/".
+ */
+static void canonicalize_path(char *path)
{
- struct ceph_mount_options *fsopt = private;
- substring_t argstr[MAX_OPT_ARGS];
- int token, intval, ret;
+ int i, j = 0;
- token = match_token((char *)c, fsopt_tokens, argstr);
- if (token < 0)
- return -EINVAL;
+ for (i = 0; path[i] != '\0'; i++) {
+ if (path[i] != '/' || j < 1 || path[j - 1] != '/')
+ path[j++] = path[i];
+ }
- if (token < Opt_last_int) {
- ret = match_int(&argstr[0], &intval);
- if (ret < 0) {
- pr_err("bad option arg (not int) at '%s'\n", c);
- return ret;
- }
- dout("got int token %d val %d\n", token, intval);
- } else if (token > Opt_last_int && token < Opt_last_string) {
- dout("got string token %d val %s\n", token,
- argstr[0].from);
+ if (j > 1 && path[j - 1] == '/')
+ j--;
+ path[j] = '\0';
+}
+
+/*
+ * Parse the source parameter. Distinguish the server list from the path.
+ *
+ * The source will look like:
+ * <server_spec>[,<server_spec>...]:[<path>]
+ * where
+ * <server_spec> is <ip>[:<port>]
+ * <path> is optional, but if present must begin with '/'
+ */
+static int ceph_parse_source(struct fs_parameter *param, struct fs_context *fc)
+{
+ struct ceph_parse_opts_ctx *pctx = fc->fs_private;
+ struct ceph_mount_options *fsopt = pctx->opts;
+ char *dev_name = param->string, *dev_name_end;
+ int ret;
+
+ dout("%s '%s'\n", __func__, dev_name);
+ if (!dev_name || !*dev_name)
+ return invalfc(fc, "Empty source");
+
+ dev_name_end = strchr(dev_name, '/');
+ if (dev_name_end) {
+ /*
+ * The server_path will include the whole chars from userland
+ * including the leading '/'.
+ */
+ kfree(fsopt->server_path);
+ fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
+ if (!fsopt->server_path)
+ return -ENOMEM;
+
+ canonicalize_path(fsopt->server_path);
} else {
- dout("got token %d\n", token);
+ dev_name_end = dev_name + strlen(dev_name);
}
+ dev_name_end--; /* back up to ':' separator */
+ if (dev_name_end < dev_name || *dev_name_end != ':')
+ return invalfc(fc, "No path or : separator in source");
+
+ dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
+ if (fsopt->server_path)
+ dout("server path '%s'\n", fsopt->server_path);
+
+ ret = ceph_parse_mon_ips(param->string, dev_name_end - dev_name,
+ pctx->copts, fc->log.log);
+ if (ret)
+ return ret;
+
+ fc->source = param->string;
+ param->string = NULL;
+ return 0;
+}
+
+static int ceph_parse_mount_param(struct fs_context *fc,
+ struct fs_parameter *param)
+{
+ struct ceph_parse_opts_ctx *pctx = fc->fs_private;
+ struct ceph_mount_options *fsopt = pctx->opts;
+ struct fs_parse_result result;
+ unsigned int mode;
+ int token, ret;
+
+ ret = ceph_parse_param(param, pctx->copts, fc->log.log);
+ if (ret != -ENOPARAM)
+ return ret;
+
+ token = fs_parse(fc, ceph_mount_parameters, param, &result);
+ dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
+ if (token < 0)
+ return token;
+
switch (token) {
case Opt_snapdirname:
kfree(fsopt->snapdir_name);
- fsopt->snapdir_name = kstrndup(argstr[0].from,
- argstr[0].to-argstr[0].from,
- GFP_KERNEL);
- if (!fsopt->snapdir_name)
- return -ENOMEM;
+ fsopt->snapdir_name = param->string;
+ param->string = NULL;
break;
case Opt_mds_namespace:
kfree(fsopt->mds_namespace);
- fsopt->mds_namespace = kstrndup(argstr[0].from,
- argstr[0].to-argstr[0].from,
- GFP_KERNEL);
- if (!fsopt->mds_namespace)
- return -ENOMEM;
+ fsopt->mds_namespace = param->string;
+ param->string = NULL;
break;
- case Opt_fscache_uniq:
- kfree(fsopt->fscache_uniq);
- fsopt->fscache_uniq = kstrndup(argstr[0].from,
- argstr[0].to-argstr[0].from,
- GFP_KERNEL);
- if (!fsopt->fscache_uniq)
- return -ENOMEM;
- fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
+ case Opt_recover_session:
+ mode = result.uint_32;
+ if (mode == ceph_recover_session_no)
+ fsopt->flags &= ~CEPH_MOUNT_OPT_CLEANRECOVER;
+ else if (mode == ceph_recover_session_clean)
+ fsopt->flags |= CEPH_MOUNT_OPT_CLEANRECOVER;
+ else
+ BUG();
break;
- /* misc */
+ case Opt_source:
+ if (fc->source)
+ return invalfc(fc, "Multiple sources specified");
+ return ceph_parse_source(param, fc);
case Opt_wsize:
- if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE)
- return -EINVAL;
- fsopt->wsize = ALIGN(intval, PAGE_SIZE);
+ if (result.uint_32 < PAGE_SIZE ||
+ result.uint_32 > CEPH_MAX_WRITE_SIZE)
+ goto out_of_range;
+ fsopt->wsize = ALIGN(result.uint_32, PAGE_SIZE);
break;
case Opt_rsize:
- if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_READ_SIZE)
- return -EINVAL;
- fsopt->rsize = ALIGN(intval, PAGE_SIZE);
+ if (result.uint_32 < PAGE_SIZE ||
+ result.uint_32 > CEPH_MAX_READ_SIZE)
+ goto out_of_range;
+ fsopt->rsize = ALIGN(result.uint_32, PAGE_SIZE);
break;
case Opt_rasize:
- if (intval < 0)
- return -EINVAL;
- fsopt->rasize = ALIGN(intval, PAGE_SIZE);
+ fsopt->rasize = ALIGN(result.uint_32, PAGE_SIZE);
break;
case Opt_caps_wanted_delay_min:
- if (intval < 1)
- return -EINVAL;
- fsopt->caps_wanted_delay_min = intval;
+ if (result.uint_32 < 1)
+ goto out_of_range;
+ fsopt->caps_wanted_delay_min = result.uint_32;
break;
case Opt_caps_wanted_delay_max:
- if (intval < 1)
- return -EINVAL;
- fsopt->caps_wanted_delay_max = intval;
+ if (result.uint_32 < 1)
+ goto out_of_range;
+ fsopt->caps_wanted_delay_max = result.uint_32;
break;
case Opt_caps_max:
- if (intval < 0)
- return -EINVAL;
- fsopt->caps_max = intval;
+ if (result.int_32 < 0)
+ goto out_of_range;
+ fsopt->caps_max = result.int_32;
break;
case Opt_readdir_max_entries:
- if (intval < 1)
- return -EINVAL;
- fsopt->max_readdir = intval;
+ if (result.uint_32 < 1)
+ goto out_of_range;
+ fsopt->max_readdir = result.uint_32;
break;
case Opt_readdir_max_bytes:
- if (intval < (int)PAGE_SIZE && intval != 0)
- return -EINVAL;
- fsopt->max_readdir_bytes = intval;
+ if (result.uint_32 < PAGE_SIZE && result.uint_32 != 0)
+ goto out_of_range;
+ fsopt->max_readdir_bytes = result.uint_32;
break;
case Opt_congestion_kb:
- if (intval < 1024) /* at least 1M */
- return -EINVAL;
- fsopt->congestion_kb = intval;
+ if (result.uint_32 < 1024) /* at least 1M */
+ goto out_of_range;
+ fsopt->congestion_kb = result.uint_32;
break;
case Opt_dirstat:
- fsopt->flags |= CEPH_MOUNT_OPT_DIRSTAT;
- break;
- case Opt_nodirstat:
- fsopt->flags &= ~CEPH_MOUNT_OPT_DIRSTAT;
+ if (!result.negated)
+ fsopt->flags |= CEPH_MOUNT_OPT_DIRSTAT;
+ else
+ fsopt->flags &= ~CEPH_MOUNT_OPT_DIRSTAT;
break;
case Opt_rbytes:
- fsopt->flags |= CEPH_MOUNT_OPT_RBYTES;
- break;
- case Opt_norbytes:
- fsopt->flags &= ~CEPH_MOUNT_OPT_RBYTES;
+ if (!result.negated)
+ fsopt->flags |= CEPH_MOUNT_OPT_RBYTES;
+ else
+ fsopt->flags &= ~CEPH_MOUNT_OPT_RBYTES;
break;
case Opt_asyncreaddir:
- fsopt->flags &= ~CEPH_MOUNT_OPT_NOASYNCREADDIR;
- break;
- case Opt_noasyncreaddir:
- fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
+ if (!result.negated)
+ fsopt->flags &= ~CEPH_MOUNT_OPT_NOASYNCREADDIR;
+ else
+ fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
break;
case Opt_dcache:
- fsopt->flags |= CEPH_MOUNT_OPT_DCACHE;
- break;
- case Opt_nodcache:
- fsopt->flags &= ~CEPH_MOUNT_OPT_DCACHE;
+ if (!result.negated)
+ fsopt->flags |= CEPH_MOUNT_OPT_DCACHE;
+ else
+ fsopt->flags &= ~CEPH_MOUNT_OPT_DCACHE;
break;
case Opt_ino32:
- fsopt->flags |= CEPH_MOUNT_OPT_INO32;
- break;
- case Opt_noino32:
- fsopt->flags &= ~CEPH_MOUNT_OPT_INO32;
+ if (!result.negated)
+ fsopt->flags |= CEPH_MOUNT_OPT_INO32;
+ else
+ fsopt->flags &= ~CEPH_MOUNT_OPT_INO32;
break;
+
case Opt_fscache:
- fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
- kfree(fsopt->fscache_uniq);
- fsopt->fscache_uniq = NULL;
- break;
- case Opt_nofscache:
- fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE;
+#ifdef CONFIG_CEPH_FSCACHE
kfree(fsopt->fscache_uniq);
fsopt->fscache_uniq = NULL;
+ if (result.negated) {
+ fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE;
+ } else {
+ fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
+ fsopt->fscache_uniq = param->string;
+ param->string = NULL;
+ }
break;
+#else
+ return invalfc(fc, "fscache support is disabled");
+#endif
case Opt_poolperm:
- fsopt->flags &= ~CEPH_MOUNT_OPT_NOPOOLPERM;
- break;
- case Opt_nopoolperm:
- fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM;
+ if (!result.negated)
+ fsopt->flags &= ~CEPH_MOUNT_OPT_NOPOOLPERM;
+ else
+ fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM;
break;
case Opt_require_active_mds:
- fsopt->flags &= ~CEPH_MOUNT_OPT_MOUNTWAIT;
- break;
- case Opt_norequire_active_mds:
- fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT;
+ if (!result.negated)
+ fsopt->flags &= ~CEPH_MOUNT_OPT_MOUNTWAIT;
+ else
+ fsopt->flags |= CEPH_MOUNT_OPT_MOUNTWAIT;
break;
case Opt_quotadf:
- fsopt->flags &= ~CEPH_MOUNT_OPT_NOQUOTADF;
- break;
- case Opt_noquotadf:
- fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF;
+ if (!result.negated)
+ fsopt->flags &= ~CEPH_MOUNT_OPT_NOQUOTADF;
+ else
+ fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF;
break;
case Opt_copyfrom:
- fsopt->flags &= ~CEPH_MOUNT_OPT_NOCOPYFROM;
- break;
- case Opt_nocopyfrom:
- fsopt->flags |= CEPH_MOUNT_OPT_NOCOPYFROM;
+ if (!result.negated)
+ fsopt->flags &= ~CEPH_MOUNT_OPT_NOCOPYFROM;
+ else
+ fsopt->flags |= CEPH_MOUNT_OPT_NOCOPYFROM;
break;
-#ifdef CONFIG_CEPH_FS_POSIX_ACL
case Opt_acl:
- fsopt->sb_flags |= SB_POSIXACL;
- break;
+ if (!result.negated) {
+#ifdef CONFIG_CEPH_FS_POSIX_ACL
+ fc->sb_flags |= SB_POSIXACL;
+#else
+ return invalfc(fc, "POSIX ACL support is disabled");
#endif
- case Opt_noacl:
- fsopt->sb_flags &= ~SB_POSIXACL;
+ } else {
+ fc->sb_flags &= ~SB_POSIXACL;
+ }
break;
default:
- BUG_ON(token);
+ BUG();
}
return 0;
+
+out_of_range:
+ return invalfc(fc, "%s out of range", param->key);
}
static void destroy_mount_options(struct ceph_mount_options *args)
{
dout("destroy_mount_options %p\n", args);
+ if (!args)
+ return;
+
kfree(args->snapdir_name);
kfree(args->mds_namespace);
kfree(args->server_path);
@@ -424,12 +493,15 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
if (ret)
return ret;
+
ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
if (ret)
return ret;
+
ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
if (ret)
return ret;
+
ret = strcmp_null(fsopt1->fscache_uniq, fsopt2->fscache_uniq);
if (ret)
return ret;
@@ -437,91 +509,6 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,
return ceph_compare_options(new_opt, fsc->client);
}
-static int parse_mount_options(struct ceph_mount_options **pfsopt,
- struct ceph_options **popt,
- int flags, char *options,
- const char *dev_name)
-{
- struct ceph_mount_options *fsopt;
- const char *dev_name_end;
- int err;
-
- if (!dev_name || !*dev_name)
- return -EINVAL;
-
- fsopt = kzalloc(sizeof(*fsopt), GFP_KERNEL);
- if (!fsopt)
- return -ENOMEM;
-
- dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name);
-
- fsopt->sb_flags = flags;
- fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
-
- fsopt->wsize = CEPH_MAX_WRITE_SIZE;
- fsopt->rsize = CEPH_MAX_READ_SIZE;
- fsopt->rasize = CEPH_RASIZE_DEFAULT;
- fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
- if (!fsopt->snapdir_name) {
- err = -ENOMEM;
- goto out;
- }
-
- fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
- fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
- fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
- fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
- fsopt->congestion_kb = default_congestion_kb();
-
- /*
- * Distinguish the server list from the path in "dev_name".
- * Internally we do not include the leading '/' in the path.
- *
- * "dev_name" will look like:
- * <server_spec>[,<server_spec>...]:[<path>]
- * where
- * <server_spec> is <ip>[:<port>]
- * <path> is optional, but if present must begin with '/'
- */
- dev_name_end = strchr(dev_name, '/');
- if (dev_name_end) {
- if (strlen(dev_name_end) > 1) {
- fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
- if (!fsopt->server_path) {
- err = -ENOMEM;
- goto out;
- }
- }
- } else {
- dev_name_end = dev_name + strlen(dev_name);
- }
- err = -EINVAL;
- dev_name_end--; /* back up to ':' separator */
- if (dev_name_end < dev_name || *dev_name_end != ':') {
- pr_err("device name is missing path (no : separator in %s)\n",
- dev_name);
- goto out;
- }
- dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
- if (fsopt->server_path)
- dout("server path '%s'\n", fsopt->server_path);
-
- *popt = ceph_parse_options(options, dev_name, dev_name_end,
- parse_fsopt_token, (void *)fsopt);
- if (IS_ERR(*popt)) {
- err = PTR_ERR(*popt);
- goto out;
- }
-
- /* success */
- *pfsopt = fsopt;
- return 0;
-
-out:
- destroy_mount_options(fsopt);
- return err;
-}
-
/**
* ceph_show_options - Show mount options in /proc/mounts
* @m: seq_file to write to
@@ -565,7 +552,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noquotadf");
#ifdef CONFIG_CEPH_FS_POSIX_ACL
- if (fsopt->sb_flags & SB_POSIXACL)
+ if (root->d_sb->s_flags & SB_POSIXACL)
seq_puts(m, ",acl");
else
seq_puts(m, ",noacl");
@@ -576,26 +563,30 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
if (fsopt->mds_namespace)
seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
+
+ if (fsopt->flags & CEPH_MOUNT_OPT_CLEANRECOVER)
+ seq_show_option(m, "recover_session", "clean");
+
if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
- seq_printf(m, ",wsize=%d", fsopt->wsize);
+ seq_printf(m, ",wsize=%u", fsopt->wsize);
if (fsopt->rsize != CEPH_MAX_READ_SIZE)
- seq_printf(m, ",rsize=%d", fsopt->rsize);
+ seq_printf(m, ",rsize=%u", fsopt->rsize);
if (fsopt->rasize != CEPH_RASIZE_DEFAULT)
- seq_printf(m, ",rasize=%d", fsopt->rasize);
+ seq_printf(m, ",rasize=%u", fsopt->rasize);
if (fsopt->congestion_kb != default_congestion_kb())
- seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
+ seq_printf(m, ",write_congestion_kb=%u", fsopt->congestion_kb);
if (fsopt->caps_max)
seq_printf(m, ",caps_max=%d", fsopt->caps_max);
if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
- seq_printf(m, ",caps_wanted_delay_min=%d",
+ seq_printf(m, ",caps_wanted_delay_min=%u",
fsopt->caps_wanted_delay_min);
if (fsopt->caps_wanted_delay_max != CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT)
- seq_printf(m, ",caps_wanted_delay_max=%d",
+ seq_printf(m, ",caps_wanted_delay_max=%u",
fsopt->caps_wanted_delay_max);
if (fsopt->max_readdir != CEPH_MAX_READDIR_DEFAULT)
- seq_printf(m, ",readdir_max_entries=%d", fsopt->max_readdir);
+ seq_printf(m, ",readdir_max_entries=%u", fsopt->max_readdir);
if (fsopt->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT)
- seq_printf(m, ",readdir_max_bytes=%d", fsopt->max_readdir_bytes);
+ seq_printf(m, ",readdir_max_bytes=%u", fsopt->max_readdir_bytes);
if (strcmp(fsopt->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
seq_show_option(m, "snapdirname", fsopt->snapdir_name);
@@ -664,6 +655,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
fsc->sb = NULL;
fsc->mount_state = CEPH_MOUNT_MOUNTING;
+ fsc->filp_gen = 1;
+ fsc->have_copy_from2 = true;
atomic_long_set(&fsc->writeback_count, 0);
@@ -713,6 +706,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
{
dout("destroy_fs_client %p\n", fsc);
+ ceph_mdsc_destroy(fsc);
destroy_workqueue(fsc->inode_wq);
destroy_workqueue(fsc->cap_wq);
@@ -814,7 +808,6 @@ static void destroy_caches(void)
ceph_fscache_unregister();
}
-
/*
* ceph_umount_begin - initiate forced umount. Tear down down the
* mount, skipping steps that may hang while waiting for server(s).
@@ -829,13 +822,7 @@ static void ceph_umount_begin(struct super_block *sb)
fsc->mount_state = CEPH_MOUNT_SHUTDOWN;
ceph_osdc_abort_requests(&fsc->client->osdc, -EIO);
ceph_mdsc_force_umount(fsc->mdsc);
- return;
-}
-
-static int ceph_remount(struct super_block *sb, int *flags, char *data)
-{
- sync_filesystem(sb);
- return 0;
+ fsc->filp_gen++; // invalidate open files
}
static const struct super_operations ceph_super_ops = {
@@ -846,7 +833,6 @@ static const struct super_operations ceph_super_ops = {
.evict_inode = ceph_evict_inode,
.sync_fs = ceph_sync_fs,
.put_super = ceph_put_super,
- .remount_fs = ceph_remount,
.show_options = ceph_show_options,
.statfs = ceph_statfs,
.umount_begin = ceph_umount_begin,
@@ -901,13 +887,11 @@ out:
return root;
}
-
-
-
/*
* mount: join the ceph cluster, and open root directory.
*/
-static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
+static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
+ struct fs_context *fc)
{
int err;
unsigned long started = jiffies; /* note the start time */
@@ -917,25 +901,21 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)
mutex_lock(&fsc->client->mount_mutex);
if (!fsc->sb->s_root) {
- const char *path;
+ const char *path = fsc->mount_options->server_path ?
+ fsc->mount_options->server_path + 1 : "";
+
err = __ceph_open_session(fsc->client, started);
if (err < 0)
goto out;
/* setup fscache */
if (fsc->mount_options->flags & CEPH_MOUNT_OPT_FSCACHE) {
- err = ceph_fscache_register_fs(fsc);
+ err = ceph_fscache_register_fs(fsc, fc);
if (err < 0)
goto out;
}
- if (!fsc->mount_options->server_path) {
- path = "";
- dout("mount opening path \\t\n");
- } else {
- path = fsc->mount_options->server_path + 1;
- dout("mount opening path %s\n", path);
- }
+ dout("mount opening path '%s'\n", path);
ceph_fs_debugfs_init(fsc);
@@ -959,18 +939,16 @@ out:
return ERR_PTR(err);
}
-static int ceph_set_super(struct super_block *s, void *data)
+static int ceph_set_super(struct super_block *s, struct fs_context *fc)
{
- struct ceph_fs_client *fsc = data;
+ struct ceph_fs_client *fsc = s->s_fs_info;
int ret;
- dout("set_super %p data %p\n", s, data);
+ dout("set_super %p\n", s);
- s->s_flags = fsc->mount_options->sb_flags;
s->s_maxbytes = MAX_LFS_FILESIZE;
s->s_xattr = ceph_xattr_handlers;
- s->s_fs_info = fsc;
fsc->sb = s;
fsc->max_file_size = 1ULL << 40; /* temp value until we get mdsmap */
@@ -979,25 +957,21 @@ static int ceph_set_super(struct super_block *s, void *data)
s->s_export_op = &ceph_export_ops;
s->s_time_gran = 1;
+ s->s_time_min = 0;
+ s->s_time_max = U32_MAX;
- ret = set_anon_super(s, NULL); /* what is that second arg for? */
+ ret = set_anon_super_fc(s, fc);
if (ret != 0)
- goto fail;
-
- return ret;
-
-fail:
- s->s_fs_info = NULL;
- fsc->sb = NULL;
+ fsc->sb = NULL;
return ret;
}
/*
* share superblock if same fs AND options
*/
-static int ceph_compare_super(struct super_block *sb, void *data)
+static int ceph_compare_super(struct super_block *sb, struct fs_context *fc)
{
- struct ceph_fs_client *new = data;
+ struct ceph_fs_client *new = fc->s_fs_info;
struct ceph_mount_options *fsopt = new->mount_options;
struct ceph_options *opt = new->client->options;
struct ceph_fs_client *other = ceph_sb_to_client(sb);
@@ -1013,7 +987,7 @@ static int ceph_compare_super(struct super_block *sb, void *data)
dout("fsid doesn't match\n");
return 0;
}
- if (fsopt->sb_flags != other->mount_options->sb_flags) {
+ if (fc->sb_flags != (sb->s_flags & ~SB_BORN)) {
dout("flags differ\n");
return 0;
}
@@ -1043,81 +1017,156 @@ static int ceph_setup_bdi(struct super_block *sb, struct ceph_fs_client *fsc)
return 0;
}
-static struct dentry *ceph_mount(struct file_system_type *fs_type,
- int flags, const char *dev_name, void *data)
+static int ceph_get_tree(struct fs_context *fc)
{
+ struct ceph_parse_opts_ctx *pctx = fc->fs_private;
struct super_block *sb;
struct ceph_fs_client *fsc;
struct dentry *res;
+ int (*compare_super)(struct super_block *, struct fs_context *) =
+ ceph_compare_super;
int err;
- int (*compare_super)(struct super_block *, void *) = ceph_compare_super;
- struct ceph_mount_options *fsopt = NULL;
- struct ceph_options *opt = NULL;
- dout("ceph_mount\n");
+ dout("ceph_get_tree\n");
-#ifdef CONFIG_CEPH_FS_POSIX_ACL
- flags |= SB_POSIXACL;
-#endif
- err = parse_mount_options(&fsopt, &opt, flags, data, dev_name);
- if (err < 0) {
- res = ERR_PTR(err);
- goto out_final;
- }
+ if (!fc->source)
+ return invalfc(fc, "No source");
/* create client (which we may/may not use) */
- fsc = create_fs_client(fsopt, opt);
+ fsc = create_fs_client(pctx->opts, pctx->copts);
+ pctx->opts = NULL;
+ pctx->copts = NULL;
if (IS_ERR(fsc)) {
- res = ERR_CAST(fsc);
+ err = PTR_ERR(fsc);
goto out_final;
}
err = ceph_mdsc_init(fsc);
- if (err < 0) {
- res = ERR_PTR(err);
+ if (err < 0)
goto out;
- }
if (ceph_test_opt(fsc->client, NOSHARE))
compare_super = NULL;
- sb = sget(fs_type, compare_super, ceph_set_super, flags, fsc);
+
+ fc->s_fs_info = fsc;
+ sb = sget_fc(fc, compare_super, ceph_set_super);
+ fc->s_fs_info = NULL;
if (IS_ERR(sb)) {
- res = ERR_CAST(sb);
+ err = PTR_ERR(sb);
goto out;
}
if (ceph_sb_to_client(sb) != fsc) {
- ceph_mdsc_destroy(fsc);
destroy_fs_client(fsc);
fsc = ceph_sb_to_client(sb);
dout("get_sb got existing client %p\n", fsc);
} else {
dout("get_sb using new client %p\n", fsc);
err = ceph_setup_bdi(sb, fsc);
- if (err < 0) {
- res = ERR_PTR(err);
+ if (err < 0)
goto out_splat;
- }
}
- res = ceph_real_mount(fsc);
- if (IS_ERR(res))
+ res = ceph_real_mount(fsc, fc);
+ if (IS_ERR(res)) {
+ err = PTR_ERR(res);
goto out_splat;
+ }
dout("root %p inode %p ino %llx.%llx\n", res,
d_inode(res), ceph_vinop(d_inode(res)));
- return res;
+ fc->root = fsc->sb->s_root;
+ return 0;
out_splat:
+ if (!ceph_mdsmap_is_cluster_available(fsc->mdsc->mdsmap)) {
+ pr_info("No mds server is up or the cluster is laggy\n");
+ err = -EHOSTUNREACH;
+ }
+
ceph_mdsc_close_sessions(fsc->mdsc);
deactivate_locked_super(sb);
goto out_final;
out:
- ceph_mdsc_destroy(fsc);
destroy_fs_client(fsc);
out_final:
- dout("ceph_mount fail %ld\n", PTR_ERR(res));
- return res;
+ dout("ceph_get_tree fail %d\n", err);
+ return err;
+}
+
+static void ceph_free_fc(struct fs_context *fc)
+{
+ struct ceph_parse_opts_ctx *pctx = fc->fs_private;
+
+ if (pctx) {
+ destroy_mount_options(pctx->opts);
+ ceph_destroy_options(pctx->copts);
+ kfree(pctx);
+ }
+}
+
+static int ceph_reconfigure_fc(struct fs_context *fc)
+{
+ sync_filesystem(fc->root->d_sb);
+ return 0;
+}
+
+static const struct fs_context_operations ceph_context_ops = {
+ .free = ceph_free_fc,
+ .parse_param = ceph_parse_mount_param,
+ .get_tree = ceph_get_tree,
+ .reconfigure = ceph_reconfigure_fc,
+};
+
+/*
+ * Set up the filesystem mount context.
+ */
+static int ceph_init_fs_context(struct fs_context *fc)
+{
+ struct ceph_parse_opts_ctx *pctx;
+ struct ceph_mount_options *fsopt;
+
+ pctx = kzalloc(sizeof(*pctx), GFP_KERNEL);
+ if (!pctx)
+ return -ENOMEM;
+
+ pctx->copts = ceph_alloc_options();
+ if (!pctx->copts)
+ goto nomem;
+
+ pctx->opts = kzalloc(sizeof(*pctx->opts), GFP_KERNEL);
+ if (!pctx->opts)
+ goto nomem;
+
+ fsopt = pctx->opts;
+ fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
+
+ fsopt->wsize = CEPH_MAX_WRITE_SIZE;
+ fsopt->rsize = CEPH_MAX_READ_SIZE;
+ fsopt->rasize = CEPH_RASIZE_DEFAULT;
+ fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
+ if (!fsopt->snapdir_name)
+ goto nomem;
+
+ fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
+ fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
+ fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
+ fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
+ fsopt->congestion_kb = default_congestion_kb();
+
+#ifdef CONFIG_CEPH_FS_POSIX_ACL
+ fc->sb_flags |= SB_POSIXACL;
+#endif
+
+ fc->fs_private = pctx;
+ fc->ops = &ceph_context_ops;
+ return 0;
+
+nomem:
+ destroy_mount_options(pctx->opts);
+ ceph_destroy_options(pctx->copts);
+ kfree(pctx);
+ return -ENOMEM;
}
static void ceph_kill_sb(struct super_block *s)
@@ -1137,8 +1186,6 @@ static void ceph_kill_sb(struct super_block *s)
ceph_fscache_unregister_fs(fsc);
- ceph_mdsc_destroy(fsc);
-
destroy_fs_client(fsc);
free_anon_bdev(dev);
}
@@ -1146,12 +1193,39 @@ static void ceph_kill_sb(struct super_block *s)
static struct file_system_type ceph_fs_type = {
.owner = THIS_MODULE,
.name = "ceph",
- .mount = ceph_mount,
+ .init_fs_context = ceph_init_fs_context,
.kill_sb = ceph_kill_sb,
.fs_flags = FS_RENAME_DOES_D_MOVE,
};
MODULE_ALIAS_FS("ceph");
+int ceph_force_reconnect(struct super_block *sb)
+{
+ struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
+ int err = 0;
+
+ ceph_umount_begin(sb);
+
+ /* Make sure all page caches get invalidated.
+ * see remove_session_caps_cb() */
+ flush_workqueue(fsc->inode_wq);
+
+ /* In case that we were blacklisted. This also reset
+ * all mon/osd connections */
+ ceph_reset_client_addr(fsc->client);
+
+ ceph_osdc_clear_abort_err(&fsc->client->osdc);
+
+ fsc->blacklisted = false;
+ fsc->mount_state = CEPH_MOUNT_MOUNTED;
+
+ if (sb->s_root) {
+ err = __ceph_do_getattr(d_inode(sb->s_root), NULL,
+ CEPH_STAT_CAP_INODE, true);
+ }
+ return err;
+}
+
static int __init init_ceph(void)
{
int ret = init_caches();
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index d2352fd95dbc..037cdfb2ad4f 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -16,6 +16,7 @@
#include <linux/slab.h>
#include <linux/posix_acl.h>
#include <linux/refcount.h>
+#include <linux/security.h>
#include <linux/ceph/libceph.h>
@@ -31,6 +32,7 @@
#define CEPH_BLOCK_SHIFT 22 /* 4 MB */
#define CEPH_BLOCK (1 << CEPH_BLOCK_SHIFT)
+#define CEPH_MOUNT_OPT_CLEANRECOVER (1<<1) /* auto reonnect (clean mode) after blacklisted */
#define CEPH_MOUNT_OPT_DIRSTAT (1<<4) /* `cat dirname` for stats */
#define CEPH_MOUNT_OPT_RBYTES (1<<5) /* dir st_bytes = rbytes */
#define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */
@@ -71,17 +73,16 @@
#define CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT 60 /* cap release delay */
struct ceph_mount_options {
- int flags;
- int sb_flags;
-
- int wsize; /* max write size */
- int rsize; /* max read size */
- int rasize; /* max readahead */
- int congestion_kb; /* max writeback in flight */
- int caps_wanted_delay_min, caps_wanted_delay_max;
+ unsigned int flags;
+
+ unsigned int wsize; /* max write size */
+ unsigned int rsize; /* max read size */
+ unsigned int rasize; /* max readahead */
+ unsigned int congestion_kb; /* max writeback in flight */
+ unsigned int caps_wanted_delay_min, caps_wanted_delay_max;
int caps_max;
- int max_readdir; /* max readdir result (entires) */
- int max_readdir_bytes; /* max readdir result (bytes) */
+ unsigned int max_readdir; /* max readdir result (entries) */
+ unsigned int max_readdir_bytes; /* max readdir result (bytes) */
/*
* everything above this point can be memcmp'd; everything below
@@ -90,7 +91,7 @@ struct ceph_mount_options {
char *snapdir_name; /* default ".snap" */
char *mds_namespace; /* default NULL */
- char *server_path; /* default "/" */
+ char *server_path; /* default NULL (means "/") */
char *fscache_uniq; /* default NULL */
};
@@ -101,6 +102,13 @@ struct ceph_fs_client {
struct ceph_client *client;
unsigned long mount_state;
+
+ unsigned long last_auto_reconnect;
+ bool blacklisted;
+
+ bool have_copy_from2;
+
+ u32 filp_gen;
loff_t max_file_size;
struct ceph_mds_client *mdsc;
@@ -395,25 +403,31 @@ struct ceph_inode_info {
struct fscache_cookie *fscache;
u32 i_fscache_gen;
#endif
+ errseq_t i_meta_err;
+
struct inode vfs_inode; /* at end */
};
-static inline struct ceph_inode_info *ceph_inode(struct inode *inode)
+static inline struct ceph_inode_info *
+ceph_inode(const struct inode *inode)
{
return container_of(inode, struct ceph_inode_info, vfs_inode);
}
-static inline struct ceph_fs_client *ceph_inode_to_client(struct inode *inode)
+static inline struct ceph_fs_client *
+ceph_inode_to_client(const struct inode *inode)
{
return (struct ceph_fs_client *)inode->i_sb->s_fs_info;
}
-static inline struct ceph_fs_client *ceph_sb_to_client(struct super_block *sb)
+static inline struct ceph_fs_client *
+ceph_sb_to_client(const struct super_block *sb)
{
return (struct ceph_fs_client *)sb->s_fs_info;
}
-static inline struct ceph_vino ceph_vino(struct inode *inode)
+static inline struct ceph_vino
+ceph_vino(const struct inode *inode)
{
return ceph_inode(inode)->i_vino;
}
@@ -499,17 +513,16 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_DIR_ORDERED (1 << 0) /* dentries in dir are ordered */
#define CEPH_I_NODELAY (1 << 1) /* do not delay cap release */
#define CEPH_I_FLUSH (1 << 2) /* do not delay flush of dirty metadata */
-#define CEPH_I_NOFLUSH (1 << 3) /* do not flush dirty caps */
-#define CEPH_I_POOL_PERM (1 << 4) /* pool rd/wr bits are valid */
-#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */
-#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
-#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
-#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */
-#define CEPH_I_KICK_FLUSH (1 << 9) /* kick flushing caps */
-#define CEPH_I_FLUSH_SNAPS (1 << 10) /* need flush snapss */
-#define CEPH_I_ERROR_WRITE (1 << 11) /* have seen write errors */
-#define CEPH_I_ERROR_FILELOCK (1 << 12) /* have seen file lock errors */
-
+#define CEPH_I_POOL_PERM (1 << 3) /* pool rd/wr bits are valid */
+#define CEPH_I_POOL_RD (1 << 4) /* can read from pool */
+#define CEPH_I_POOL_WR (1 << 5) /* can write to pool */
+#define CEPH_I_SEC_INITED (1 << 6) /* security initialized */
+#define CEPH_I_CAP_DROPPED (1 << 7) /* caps were forcibly dropped */
+#define CEPH_I_KICK_FLUSH (1 << 8) /* kick flushing caps */
+#define CEPH_I_FLUSH_SNAPS (1 << 9) /* need flush snapss */
+#define CEPH_I_ERROR_WRITE (1 << 10) /* have seen write errors */
+#define CEPH_I_ERROR_FILELOCK (1 << 11) /* have seen file lock errors */
+#define CEPH_I_ODIRECT (1 << 12) /* inode in direct I/O mode */
/*
* Masks of ceph inode work.
@@ -703,6 +716,10 @@ struct ceph_file_info {
spinlock_t rw_contexts_lock;
struct list_head rw_contexts;
+
+ errseq_t meta_err;
+ u32 filp_gen;
+ atomic_t num_locks;
};
struct ceph_dir_file_info {
@@ -842,7 +859,8 @@ static inline int default_congestion_kb(void)
}
-
+/* super.c */
+extern int ceph_force_reconnect(struct super_block *sb);
/* snap.c */
struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
u64 ino);
@@ -926,7 +944,7 @@ extern int ceph_getattr(const struct path *path, struct kstat *stat,
int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);
ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
-extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
+extern struct ceph_buffer *__ceph_build_xattrs_blob(struct ceph_inode_info *ci);
extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
extern const struct xattr_handler *ceph_xattr_handlers[];
@@ -959,7 +977,10 @@ static inline bool ceph_security_xattr_wanted(struct inode *in)
#ifdef CONFIG_CEPH_FS_SECURITY_LABEL
extern int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
struct ceph_acl_sec_ctx *ctx);
-extern void ceph_security_invalidate_secctx(struct inode *inode);
+static inline void ceph_security_invalidate_secctx(struct inode *inode)
+{
+ security_inode_invalidate_secctx(inode);
+}
#else
static inline int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
struct ceph_acl_sec_ctx *ctx)
@@ -1039,7 +1060,6 @@ extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
int mds);
-extern int ceph_get_cap_mds(struct inode *inode);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
@@ -1058,9 +1078,9 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
struct inode *dir,
int mds, int drop, int unless);
-extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+extern int ceph_get_caps(struct file *filp, int need, int want,
loff_t endoff, int *got, struct page **pinned_page);
-extern int ceph_try_get_caps(struct ceph_inode_info *ci,
+extern int ceph_try_get_caps(struct inode *inode,
int need, int want, bool nonblock, int *got);
/* for counting open files by mode */
@@ -1071,7 +1091,7 @@ extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode);
extern const struct address_space_operations ceph_aops;
extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
extern int ceph_uninline_data(struct file *filp, struct page *locked_page);
-extern int ceph_pool_perm_check(struct ceph_inode_info *ci, int need);
+extern int ceph_pool_perm_check(struct inode *inode, int need);
extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
/* file.c */
diff --git a/fs/ceph/util.c b/fs/ceph/util.c
new file mode 100644
index 000000000000..2c34875675bf
--- /dev/null
+++ b/fs/ceph/util.c
@@ -0,0 +1,100 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Some non-inline ceph helpers
+ */
+#include <linux/module.h>
+#include <linux/ceph/types.h>
+
+/*
+ * return true if @layout appears to be valid
+ */
+int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
+{
+ __u32 su = layout->stripe_unit;
+ __u32 sc = layout->stripe_count;
+ __u32 os = layout->object_size;
+
+ /* stripe unit, object size must be non-zero, 64k increment */
+ if (!su || (su & (CEPH_MIN_STRIPE_UNIT-1)))
+ return 0;
+ if (!os || (os & (CEPH_MIN_STRIPE_UNIT-1)))
+ return 0;
+ /* object size must be a multiple of stripe unit */
+ if (os < su || os % su)
+ return 0;
+ /* stripe count must be non-zero */
+ if (!sc)
+ return 0;
+ return 1;
+}
+
+void ceph_file_layout_from_legacy(struct ceph_file_layout *fl,
+ struct ceph_file_layout_legacy *legacy)
+{
+ fl->stripe_unit = le32_to_cpu(legacy->fl_stripe_unit);
+ fl->stripe_count = le32_to_cpu(legacy->fl_stripe_count);
+ fl->object_size = le32_to_cpu(legacy->fl_object_size);
+ fl->pool_id = le32_to_cpu(legacy->fl_pg_pool);
+ if (fl->pool_id == 0 && fl->stripe_unit == 0 &&
+ fl->stripe_count == 0 && fl->object_size == 0)
+ fl->pool_id = -1;
+}
+
+void ceph_file_layout_to_legacy(struct ceph_file_layout *fl,
+ struct ceph_file_layout_legacy *legacy)
+{
+ legacy->fl_stripe_unit = cpu_to_le32(fl->stripe_unit);
+ legacy->fl_stripe_count = cpu_to_le32(fl->stripe_count);
+ legacy->fl_object_size = cpu_to_le32(fl->object_size);
+ if (fl->pool_id >= 0)
+ legacy->fl_pg_pool = cpu_to_le32(fl->pool_id);
+ else
+ legacy->fl_pg_pool = 0;
+}
+
+int ceph_flags_to_mode(int flags)
+{
+ int mode;
+
+#ifdef O_DIRECTORY /* fixme */
+ if ((flags & O_DIRECTORY) == O_DIRECTORY)
+ return CEPH_FILE_MODE_PIN;
+#endif
+
+ switch (flags & O_ACCMODE) {
+ case O_WRONLY:
+ mode = CEPH_FILE_MODE_WR;
+ break;
+ case O_RDONLY:
+ mode = CEPH_FILE_MODE_RD;
+ break;
+ case O_RDWR:
+ case O_ACCMODE: /* this is what the VFS does */
+ mode = CEPH_FILE_MODE_RDWR;
+ break;
+ }
+#ifdef O_LAZY
+ if (flags & O_LAZY)
+ mode |= CEPH_FILE_MODE_LAZY;
+#endif
+
+ return mode;
+}
+
+int ceph_caps_for_mode(int mode)
+{
+ int caps = CEPH_CAP_PIN;
+
+ if (mode & CEPH_FILE_MODE_RD)
+ caps |= CEPH_CAP_FILE_SHARED |
+ CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE;
+ if (mode & CEPH_FILE_MODE_WR)
+ caps |= CEPH_CAP_FILE_EXCL |
+ CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
+ CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
+ CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
+ if (mode & CEPH_FILE_MODE_LAZY)
+ caps |= CEPH_CAP_FILE_LAZYIO;
+
+ return caps;
+}
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 37b458a9af3a..7b8a070a782d 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -20,7 +20,8 @@ static int __remove_xattr(struct ceph_inode_info *ci,
static bool ceph_is_valid_xattr(const char *name)
{
- return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
+ return !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN) ||
+ !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
!strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
!strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
}
@@ -654,7 +655,7 @@ static int __build_xattrs(struct inode *inode)
u32 len;
const char *name, *val;
struct ceph_inode_info *ci = ceph_inode(inode);
- int xattr_version;
+ u64 xattr_version;
struct ceph_inode_xattr **xattrs = NULL;
int err = 0;
int i;
@@ -754,12 +755,15 @@ static int __get_required_blob_size(struct ceph_inode_info *ci, int name_size,
/*
* If there are dirty xattrs, reencode xattrs into the prealloc_blob
- * and swap into place.
+ * and swap into place. It returns the old i_xattrs.blob (or NULL) so
+ * that it can be freed by the caller as the i_ceph_lock is likely to be
+ * held.
*/
-void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
+struct ceph_buffer *__ceph_build_xattrs_blob(struct ceph_inode_info *ci)
{
struct rb_node *p;
struct ceph_inode_xattr *xattr = NULL;
+ struct ceph_buffer *old_blob = NULL;
void *dest;
dout("__build_xattrs_blob %p\n", &ci->vfs_inode);
@@ -790,12 +794,14 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
dest - ci->i_xattrs.prealloc_blob->vec.iov_base;
if (ci->i_xattrs.blob)
- ceph_buffer_put(ci->i_xattrs.blob);
+ old_blob = ci->i_xattrs.blob;
ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob;
ci->i_xattrs.prealloc_blob = NULL;
ci->i_xattrs.dirty = false;
ci->i_xattrs.version++;
}
+
+ return old_blob;
}
static inline int __get_request_mask(struct inode *in) {
@@ -845,7 +851,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
req_mask = __get_request_mask(inode);
spin_lock(&ci->i_ceph_lock);
- dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
+ dout("getxattr %p name '%s' ver=%lld index_ver=%lld\n", inode, name,
ci->i_xattrs.version, ci->i_xattrs.index_version);
if (ci->i_xattrs.version == 0 ||
@@ -887,7 +893,8 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
memcpy(value, xattr->val, xattr->val_len);
if (current->journal_info &&
- !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
+ !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN) &&
+ security_ismaclabel(name + XATTR_SECURITY_PREFIX_LEN))
ci->i_ceph_flags |= CEPH_I_SEC_INITED;
out:
spin_unlock(&ci->i_ceph_lock);
@@ -898,11 +905,9 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
{
struct inode *inode = d_inode(dentry);
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_vxattr *vxattrs = ceph_inode_vxattrs(inode);
bool len_only = (size == 0);
u32 namelen;
int err;
- int i;
spin_lock(&ci->i_ceph_lock);
dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
@@ -931,33 +936,6 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
names = __copy_xattr_names(ci, names);
size -= namelen;
}
-
-
- /* virtual xattr names, too */
- if (vxattrs) {
- for (i = 0; vxattrs[i].name; i++) {
- size_t this_len;
-
- if (vxattrs[i].flags & VXATTR_FLAG_HIDDEN)
- continue;
- if (vxattrs[i].exists_cb && !vxattrs[i].exists_cb(ci))
- continue;
-
- this_len = strlen(vxattrs[i].name) + 1;
- namelen += this_len;
- if (len_only)
- continue;
-
- if (this_len > size) {
- err = -ERANGE;
- goto out;
- }
-
- memcpy(names, vxattrs[i].name, this_len);
- names += this_len;
- size -= this_len;
- }
- }
err = namelen;
out:
spin_unlock(&ci->i_ceph_lock);
@@ -1036,6 +1014,7 @@ int __ceph_setxattr(struct inode *inode, const char *name,
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_cap_flush *prealloc_cf = NULL;
+ struct ceph_buffer *old_blob = NULL;
int issued;
int err;
int dirty = 0;
@@ -1099,7 +1078,8 @@ retry:
}
}
- dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
+ dout("setxattr %p name '%s' issued %s\n", inode, name,
+ ceph_cap_string(issued));
__build_xattrs(inode);
required_blob_size = __get_required_blob_size(ci, name_len, val_len);
@@ -1109,13 +1089,15 @@ retry:
struct ceph_buffer *blob;
spin_unlock(&ci->i_ceph_lock);
- dout(" preaallocating new blob size=%d\n", required_blob_size);
+ ceph_buffer_put(old_blob); /* Shouldn't be required */
+ dout(" pre-allocating new blob size=%d\n", required_blob_size);
blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
if (!blob)
goto do_sync_unlocked;
spin_lock(&ci->i_ceph_lock);
+ /* prealloc_blob can't be released while holding i_ceph_lock */
if (ci->i_xattrs.prealloc_blob)
- ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+ old_blob = ci->i_xattrs.prealloc_blob;
ci->i_xattrs.prealloc_blob = blob;
goto retry;
}
@@ -1131,6 +1113,7 @@ retry:
}
spin_unlock(&ci->i_ceph_lock);
+ ceph_buffer_put(old_blob);
if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem);
if (dirty)
@@ -1284,42 +1267,8 @@ out:
ceph_pagelist_release(pagelist);
return err;
}
-
-void ceph_security_invalidate_secctx(struct inode *inode)
-{
- security_inode_invalidate_secctx(inode);
-}
-
-static int ceph_xattr_set_security_label(const struct xattr_handler *handler,
- struct dentry *unused, struct inode *inode,
- const char *key, const void *buf,
- size_t buflen, int flags)
-{
- if (security_ismaclabel(key)) {
- const char *name = xattr_full_name(handler, key);
- return __ceph_setxattr(inode, name, buf, buflen, flags);
- }
- return -EOPNOTSUPP;
-}
-
-static int ceph_xattr_get_security_label(const struct xattr_handler *handler,
- struct dentry *unused, struct inode *inode,
- const char *key, void *buf, size_t buflen)
-{
- if (security_ismaclabel(key)) {
- const char *name = xattr_full_name(handler, key);
- return __ceph_getxattr(inode, name, buf, buflen);
- }
- return -EOPNOTSUPP;
-}
-
-static const struct xattr_handler ceph_security_label_handler = {
- .prefix = XATTR_SECURITY_PREFIX,
- .get = ceph_xattr_get_security_label,
- .set = ceph_xattr_set_security_label,
-};
-#endif
-#endif
+#endif /* CONFIG_CEPH_FS_SECURITY_LABEL */
+#endif /* CONFIG_SECURITY */
void ceph_release_acl_sec_ctx(struct ceph_acl_sec_ctx *as_ctx)
{
@@ -1343,9 +1292,6 @@ const struct xattr_handler *ceph_xattr_handlers[] = {
&posix_acl_access_xattr_handler,
&posix_acl_default_xattr_handler,
#endif
-#ifdef CONFIG_CEPH_FS_SECURITY_LABEL
- &ceph_security_label_handler,
-#endif
&ceph_other_xattr_handler,
NULL,
};
OpenPOWER on IntegriCloud