From 7627151ea30bce2051e3cb27d7bb2c30083f86a5 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 3 Feb 2016 21:24:49 +0800 Subject: libceph: define new ceph_file_layout structure Define new ceph_file_layout structure and rename old ceph_file_layout to ceph_file_layout_legacy. This is preparation for adding namespace to ceph_file_layout structure. Signed-off-by: Yan, Zheng --- fs/ceph/inode.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index f059b5997072..6c5903e0a976 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -814,10 +814,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page, if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { - if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool) - ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; - ci->i_layout = info->layout; + s64 old_pool = ci->i_layout.pool_id; + ceph_file_layout_from_legacy(&ci->i_layout, &info->layout); ci->i_pool_ns_len = iinfo->pool_ns_len; + if (ci->i_layout.pool_id != old_pool) + ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(info->truncate_seq), -- cgit v1.2.3 From 30c156d9951e0aa88202707d80c583b0a09d3167 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sun, 14 Feb 2016 11:24:31 +0800 Subject: libceph: rados pool namespace support Add pool namesapce pointer to struct ceph_file_layout and struct ceph_object_locator. Pool namespace is used by when mapping object to PG, it's also used when composing OSD request. The namespace pointer in struct ceph_file_layout is RCU protected. So libceph can read namespace without taking lock. Signed-off-by: Yan, Zheng [idryomov@gmail.com: ceph_oloc_destroy(), misc minor changes] Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 1 + fs/ceph/inode.c | 3 +++ include/linux/ceph/ceph_fs.h | 2 ++ include/linux/ceph/osdmap.h | 10 ++++----- net/ceph/debugfs.c | 12 ++++++++-- net/ceph/osd_client.c | 32 +++++++++++++++++++++------ net/ceph/osdmap.c | 52 +++++++++++++++++++++++++++++++++++++++----- 7 files changed, 92 insertions(+), 20 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cc272ed46cfd..58fd02d4e534 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3999,6 +3999,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->layout.stripe_count = 1; rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER; rbd_dev->layout.pool_id = spec->pool_id; + RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); /* * If this is a mapping rbd_dev (as opposed to a parent one), diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 6c5903e0a976..d035e0ab6029 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -446,6 +446,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_symlink = NULL; memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); + RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL); ci->i_pool_ns_len = 0; ci->i_fragtree = RB_ROOT; @@ -570,6 +571,8 @@ void ceph_destroy_inode(struct inode *inode) if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); + ceph_put_string(ci->i_layout.pool_ns); + call_rcu(&inode->i_rcu, ceph_i_callback); } diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index e5a5fb9ca3f5..08b8fd72261e 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -53,6 +53,7 @@ struct ceph_file_layout_legacy { __le32 fl_pg_pool; /* namespace, crush ruleset, rep level */ } __attribute__ ((packed)); +struct ceph_string; /* * ceph_file_layout - describe data layout for a file/inode */ @@ -62,6 +63,7 @@ struct ceph_file_layout { u32 stripe_count; /* over this many objects */ u32 object_size; /* until objects are this big */ s64 pool_id; /* rados pool id */ + struct ceph_string __rcu *pool_ns; /* rados pool namespace */ }; extern int ceph_file_layout_is_valid(const struct ceph_file_layout *layout); diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 21d7f048959f..9a9041784dcf 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -63,11 +63,13 @@ static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool) struct ceph_object_locator { s64 pool; + struct ceph_string *pool_ns; }; static inline void ceph_oloc_init(struct ceph_object_locator *oloc) { oloc->pool = -1; + oloc->pool_ns = NULL; } static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc) @@ -75,11 +77,9 @@ static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc) return oloc->pool == -1; } -static inline void ceph_oloc_copy(struct ceph_object_locator *dest, - const struct ceph_object_locator *src) -{ - dest->pool = src->pool; -} +void ceph_oloc_copy(struct ceph_object_locator *dest, + const struct ceph_object_locator *src); +void ceph_oloc_destroy(struct ceph_object_locator *oloc); /* * Maximum supported by kernel client object name length diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index e77b04ca7802..c62b2b029a6e 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -156,8 +156,16 @@ static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t) seq_printf(s, "]/%d\t[", t->up.primary); for (i = 0; i < t->acting.size; i++) seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]); - seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary, - t->target_oid.name_len, t->target_oid.name, t->flags); + seq_printf(s, "]/%d\t", t->acting.primary); + if (t->target_oloc.pool_ns) { + seq_printf(s, "%*pE/%*pE\t0x%x", + (int)t->target_oloc.pool_ns->len, + t->target_oloc.pool_ns->str, + t->target_oid.name_len, t->target_oid.name, t->flags); + } else { + seq_printf(s, "%*pE\t0x%x", t->target_oid.name_len, + t->target_oid.name, t->flags); + } if (t->paused) seq_puts(s, "\tP"); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 23efcac80072..b68cc15e9cdc 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -387,7 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest, static void target_destroy(struct ceph_osd_request_target *t) { ceph_oid_destroy(&t->base_oid); + ceph_oloc_destroy(&t->base_oloc); ceph_oid_destroy(&t->target_oid); + ceph_oloc_destroy(&t->target_oloc); } /* @@ -533,6 +535,11 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } EXPORT_SYMBOL(ceph_osdc_alloc_request); +static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc) +{ + return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); +} + int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) { struct ceph_osd_client *osdc = req->r_osdc; @@ -540,11 +547,13 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) int msg_size; WARN_ON(ceph_oid_empty(&req->r_base_oid)); + WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); /* create request message */ msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ - msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ + msg_size += CEPH_ENCODING_START_BLK_LEN + + ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pgid */ msg_size += 4 + req->r_base_oid.name_len; /* oid */ msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); @@ -949,6 +958,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_flags = flags; req->r_base_oloc.pool = layout->pool_id; + req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); req->r_snapid = vino.snap; @@ -1489,12 +1499,16 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg) p += sizeof(req->r_replay_version); /* oloc */ - ceph_encode_8(&p, 4); - ceph_encode_8(&p, 4); - ceph_encode_32(&p, 8 + 4 + 4); + ceph_start_encoding(&p, 5, 4, + ceph_oloc_encoding_size(&req->r_t.target_oloc)); ceph_encode_64(&p, req->r_t.target_oloc.pool); ceph_encode_32(&p, -1); /* preferred */ ceph_encode_32(&p, 0); /* key len */ + if (req->r_t.target_oloc.pool_ns) + ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str, + req->r_t.target_oloc.pool_ns->len); + else + ceph_encode_32(&p, 0); /* pgid */ ceph_encode_8(&p, 1); @@ -2596,8 +2610,8 @@ static int ceph_oloc_decode(void **p, void *end, if (struct_v >= 5) { len = ceph_decode_32(p); if (len > 0) { - pr_warn("ceph_object_locator::nspace is set\n"); - goto e_inval; + ceph_decode_need(p, end, len, e_inval); + *p += len; } } @@ -2835,7 +2849,11 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) unlink_request(osd, req); mutex_unlock(&osd->lock); - ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc); + /* + * Not ceph_oloc_copy() - changing pool_ns is not + * supported. + */ + req->r_t.target_oloc.pool = m.redirect.oloc.pool; req->r_flags |= CEPH_OSD_FLAG_REDIRECTED; req->r_tid = 0; __submit_request(req, false); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 5947b2e9fb8e..d2436880b305 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1510,6 +1510,24 @@ bad: return ERR_PTR(err); } +void ceph_oloc_copy(struct ceph_object_locator *dest, + const struct ceph_object_locator *src) +{ + WARN_ON(!ceph_oloc_empty(dest)); + WARN_ON(dest->pool_ns); /* empty() only covers ->pool */ + + dest->pool = src->pool; + if (src->pool_ns) + dest->pool_ns = ceph_get_string(src->pool_ns); +} +EXPORT_SYMBOL(ceph_oloc_copy); + +void ceph_oloc_destroy(struct ceph_object_locator *oloc) +{ + ceph_put_string(oloc->pool_ns); +} +EXPORT_SYMBOL(ceph_oloc_destroy); + void ceph_oid_copy(struct ceph_object_id *dest, const struct ceph_object_id *src) { @@ -1844,12 +1862,34 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, if (!pi) return -ENOENT; - raw_pgid->pool = oloc->pool; - raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, - oid->name_len); - - dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, - raw_pgid->pool, raw_pgid->seed); + if (!oloc->pool_ns) { + raw_pgid->pool = oloc->pool; + raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, + oid->name_len); + dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, + raw_pgid->pool, raw_pgid->seed); + } else { + char stack_buf[256]; + char *buf = stack_buf; + int nsl = oloc->pool_ns->len; + size_t total = nsl + 1 + oid->name_len; + + if (total > sizeof(stack_buf)) { + buf = kmalloc(total, GFP_NOIO); + if (!buf) + return -ENOMEM; + } + memcpy(buf, oloc->pool_ns->str, nsl); + buf[nsl] = '\037'; + memcpy(buf + nsl + 1, oid->name, oid->name_len); + raw_pgid->pool = oloc->pool; + raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total); + if (buf != stack_buf) + kfree(buf); + dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__, + oid->name, nsl, oloc->pool_ns->str, + raw_pgid->pool, raw_pgid->seed); + } return 0; } EXPORT_SYMBOL(ceph_object_locator_to_pg); -- cgit v1.2.3 From 779fe0fb8e1883d5c479ac6bd85fbd237deed1f7 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 7 Mar 2016 09:35:06 +0800 Subject: ceph: rados pool namespace support This patch adds codes that decode pool namespace information in cap message and request reply. Pool namespace is saved in i_layout, it will be passed to libceph when doing read/write. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 67 +++++++++++++++++++++++++++++++++++---------- fs/ceph/caps.c | 46 +++++++++++++++++-------------- fs/ceph/inode.c | 20 +++++++++++--- fs/ceph/ioctl.c | 3 ++ fs/ceph/mds_client.c | 19 +++++-------- fs/ceph/mds_client.h | 3 ++ fs/ceph/super.h | 1 - fs/ceph/xattr.c | 77 +++++++++++++++++++++++++++++++++++----------------- 8 files changed, 159 insertions(+), 77 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 3f8efd866fec..d5b6f959a3c3 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1730,7 +1730,8 @@ enum { POOL_WRITE = 2, }; -static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) +static int __ceph_pool_perm_get(struct ceph_inode_info *ci, + s64 pool, struct ceph_string *pool_ns) { struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); struct ceph_mds_client *mdsc = fsc->mdsc; @@ -1738,6 +1739,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) struct rb_node **p, *parent; struct ceph_pool_perm *perm; struct page **pages; + size_t pool_ns_len; int err = 0, err2 = 0, have = 0; down_read(&mdsc->pool_perm_rwsem); @@ -1749,17 +1751,31 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) else if (pool > perm->pool) p = &(*p)->rb_right; else { - have = perm->perm; - break; + int ret = ceph_compare_string(pool_ns, + perm->pool_ns, + perm->pool_ns_len); + if (ret < 0) + p = &(*p)->rb_left; + else if (ret > 0) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } } } up_read(&mdsc->pool_perm_rwsem); if (*p) goto out; - dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool); + if (pool_ns) + dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n", + pool, (int)pool_ns->len, pool_ns->str); + else + dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool); down_write(&mdsc->pool_perm_rwsem); + p = &mdsc->pool_perm_tree.rb_node; parent = NULL; while (*p) { parent = *p; @@ -1769,8 +1785,17 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) else if (pool > perm->pool) p = &(*p)->rb_right; else { - have = perm->perm; - break; + int ret = ceph_compare_string(pool_ns, + perm->pool_ns, + perm->pool_ns_len); + if (ret < 0) + p = &(*p)->rb_left; + else if (ret > 0) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } } } if (*p) { @@ -1788,6 +1813,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) rd_req->r_flags = CEPH_OSD_FLAG_READ; osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); rd_req->r_base_oloc.pool = pool; + if (pool_ns) + rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns); ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); @@ -1841,7 +1868,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) goto out_unlock; } - perm = kmalloc(sizeof(*perm), GFP_NOFS); + pool_ns_len = pool_ns ? pool_ns->len : 0; + perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS); if (!perm) { err = -ENOMEM; goto out_unlock; @@ -1849,6 +1877,11 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool) perm->pool = pool; perm->perm = have; + perm->pool_ns_len = pool_ns_len; + if (pool_ns_len > 0) + memcpy(perm->pool_ns, pool_ns->str, pool_ns_len); + perm->pool_ns[pool_ns_len] = 0; + rb_link_node(&perm->node, parent, p); rb_insert_color(&perm->node, &mdsc->pool_perm_tree); err = 0; @@ -1860,19 +1893,20 @@ out_unlock: out: if (!err) err = have; - dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err); + if (pool_ns) + dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n", + pool, (int)pool_ns->len, pool_ns->str, err); + else + dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err); return err; } int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) { s64 pool; + struct ceph_string *pool_ns; int ret, flags; - /* does not support pool namespace yet */ - if (ci->i_pool_ns_len) - return -EIO; - if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), NOPOOLPERM)) return 0; @@ -1896,7 +1930,9 @@ check: return 0; } - ret = __ceph_pool_perm_get(ci, pool); + pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); + ret = __ceph_pool_perm_get(ci, pool, pool_ns); + ceph_put_string(pool_ns); if (ret < 0) return ret; @@ -1907,8 +1943,9 @@ check: flags |= CEPH_I_POOL_WR; spin_lock(&ci->i_ceph_lock); - if (pool == ci->i_layout.pool_id) { - ci->i_ceph_flags = flags; + if (pool == ci->i_layout.pool_id && + pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) { + ci->i_ceph_flags |= flags; } else { pool = ci->i_layout.pool_id; flags = ci->i_ceph_flags; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index f24722dce167..0a9406a8a794 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2779,12 +2779,11 @@ static void invalidate_aliases(struct inode *inode) */ static void handle_cap_grant(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *grant, - u64 inline_version, - void *inline_data, int inline_len, + struct ceph_string **pns, u64 inline_version, + void *inline_data, u32 inline_len, struct ceph_buffer *xattr_buf, struct ceph_mds_session *session, - struct ceph_cap *cap, int issued, - u32 pool_ns_len) + struct ceph_cap *cap, int issued) __releases(ci->i_ceph_lock) __releases(mdsc->snap_rwsem) { @@ -2896,11 +2895,18 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { /* file layout may have changed */ s64 old_pool = ci->i_layout.pool_id; + struct ceph_string *old_ns; + ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); - ci->i_pool_ns_len = pool_ns_len; - if (ci->i_layout.pool_id != old_pool) + old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, + lockdep_is_held(&ci->i_ceph_lock)); + rcu_assign_pointer(ci->i_layout.pool_ns, *pns); + + if (ci->i_layout.pool_id != old_pool || *pns != old_ns) ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; + *pns = old_ns; + /* size/truncate_seq? */ queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(grant->truncate_seq), @@ -3423,20 +3429,18 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_cap *cap; struct ceph_mds_caps *h; struct ceph_mds_cap_peer *peer = NULL; - struct ceph_snap_realm *realm; + struct ceph_snap_realm *realm = NULL; + struct ceph_string *pool_ns = NULL; int mds = session->s_mds; int op, issued; u32 seq, mseq; struct ceph_vino vino; - u64 cap_id; - u64 size, max_size; u64 tid; u64 inline_version = 0; void *inline_data = NULL; u32 inline_len = 0; void *snaptrace; size_t snaptrace_len; - u32 pool_ns_len = 0; void *p, *end; dout("handle_caps from mds%d\n", mds); @@ -3450,11 +3454,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, op = le32_to_cpu(h->op); vino.ino = le64_to_cpu(h->ino); vino.snap = CEPH_NOSNAP; - cap_id = le64_to_cpu(h->cap_id); seq = le32_to_cpu(h->seq); mseq = le32_to_cpu(h->migrate_seq); - size = le64_to_cpu(h->size); - max_size = le64_to_cpu(h->max_size); snaptrace = h + 1; snaptrace_len = le32_to_cpu(h->snap_trace_len); @@ -3493,6 +3494,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, u64 flush_tid; u32 caller_uid, caller_gid; u32 osd_epoch_barrier; + u32 pool_ns_len; /* version >= 5 */ ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad); /* version >= 6 */ @@ -3502,6 +3504,11 @@ void ceph_handle_caps(struct ceph_mds_session *session, ceph_decode_32_safe(&p, end, caller_gid, bad); /* version >= 8 */ ceph_decode_32_safe(&p, end, pool_ns_len, bad); + if (pool_ns_len > 0) { + ceph_decode_need(&p, end, pool_ns_len, bad); + pool_ns = ceph_find_or_create_string(p, pool_ns_len); + p += pool_ns_len; + } } /* lookup ino */ @@ -3522,7 +3529,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, cap = ceph_get_cap(mdsc, NULL); cap->cap_ino = vino.ino; cap->queue_release = 1; - cap->cap_id = cap_id; + cap->cap_id = le64_to_cpu(h->cap_id); cap->mseq = mseq; cap->seq = seq; spin_lock(&session->s_cap_lock); @@ -3557,10 +3564,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, } handle_cap_import(mdsc, inode, h, peer, session, &cap, &issued); - handle_cap_grant(mdsc, inode, h, + handle_cap_grant(mdsc, inode, h, &pool_ns, inline_version, inline_data, inline_len, - msg->middle, session, cap, issued, - pool_ns_len); + msg->middle, session, cap, issued); if (realm) ceph_put_snap_realm(mdsc, realm); goto done_unlocked; @@ -3582,10 +3588,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, case CEPH_CAP_OP_GRANT: __ceph_caps_issued(ci, &issued); issued |= __ceph_caps_dirty(ci); - handle_cap_grant(mdsc, inode, h, + handle_cap_grant(mdsc, inode, h, &pool_ns, inline_version, inline_data, inline_len, - msg->middle, session, cap, issued, - pool_ns_len); + msg->middle, session, cap, issued); goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: @@ -3616,6 +3621,7 @@ done: mutex_unlock(&session->s_mutex); done_unlocked: iput(inode); + ceph_put_string(pool_ns); return; bad: diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index d035e0ab6029..dc032566ed71 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -447,7 +447,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb) memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL); - ci->i_pool_ns_len = 0; ci->i_fragtree = RB_ROOT; mutex_init(&ci->i_fragtree_mutex); @@ -571,7 +570,7 @@ void ceph_destroy_inode(struct inode *inode) if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); - ceph_put_string(ci->i_layout.pool_ns); + ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns)); call_rcu(&inode->i_rcu, ceph_i_callback); } @@ -736,6 +735,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, int issued = 0, implemented, new_issued; struct timespec mtime, atime, ctime; struct ceph_buffer *xattr_blob = NULL; + struct ceph_string *pool_ns = NULL; struct ceph_cap *new_cap = NULL; int err = 0; bool wake = false; @@ -763,6 +763,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, iinfo->xattr_len); } + if (iinfo->pool_ns_len > 0) + pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data, + iinfo->pool_ns_len); + spin_lock(&ci->i_ceph_lock); /* @@ -818,11 +822,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page, if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { s64 old_pool = ci->i_layout.pool_id; + struct ceph_string *old_ns; + ceph_file_layout_from_legacy(&ci->i_layout, &info->layout); - ci->i_pool_ns_len = iinfo->pool_ns_len; - if (ci->i_layout.pool_id != old_pool) + old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, + lockdep_is_held(&ci->i_ceph_lock)); + rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns); + + if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns) ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; + pool_ns = old_ns; + queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(info->truncate_seq), le64_to_cpu(info->truncate_size), @@ -989,6 +1000,7 @@ out: ceph_put_cap(mdsc, new_cap); if (xattr_blob) ceph_buffer_put(xattr_blob); + ceph_put_string(pool_ns); return err; } diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 843dd31a02cd..6a30101b55ef 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -213,9 +213,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) ceph_ino(inode), dl.object_no); oloc.pool = ci->i_layout.pool_id; + oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); ceph_oid_printf(&oid, "%s", dl.object_name); r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid); + + ceph_oloc_destroy(&oloc); if (r < 0) { up_read(&osdc->lock); return r; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 2103b823bec0..46641bbc8056 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -100,12 +100,15 @@ static int parse_reply_info_in(void **p, void *end, } else info->inline_version = CEPH_INLINE_NONE; + info->pool_ns_len = 0; + info->pool_ns_data = NULL; if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { ceph_decode_32_safe(p, end, info->pool_ns_len, bad); - ceph_decode_need(p, end, info->pool_ns_len, bad); - *p += info->pool_ns_len; - } else { - info->pool_ns_len = 0; + if (info->pool_ns_len > 0) { + ceph_decode_need(p, end, info->pool_ns_len, bad); + info->pool_ns_data = *p; + *p += info->pool_ns_len; + } } return 0; @@ -2292,14 +2295,6 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - /* deny access to directories with pool_ns layouts */ - if (req->r_inode && S_ISDIR(req->r_inode->i_mode) && - ceph_inode(req->r_inode)->i_pool_ns_len) - return -EIO; - if (req->r_locked_dir && - ceph_inode(req->r_locked_dir)->i_pool_ns_len) - return -EIO; - /* issue */ mutex_lock(&mdsc->mutex); __register_request(mdsc, req, dir); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 75ecf967d0b7..2ce8e9f9bfc9 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -45,6 +45,7 @@ struct ceph_mds_reply_info_in { u32 inline_len; char *inline_data; u32 pool_ns_len; + char *pool_ns_data; }; struct ceph_mds_reply_dir_entry { @@ -277,6 +278,8 @@ struct ceph_pool_perm { struct rb_node node; int perm; s64 pool; + size_t pool_ns_len; + char pool_ns[]; }; /* diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 0168b49fb6ad..7ceab18c8ee2 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -287,7 +287,6 @@ struct ceph_inode_info { struct ceph_dir_layout i_dir_layout; struct ceph_file_layout i_layout; - size_t i_pool_ns_len; char *i_symlink; /* for dirs */ diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 5377c9c7a0c5..adc231892b0d 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -57,56 +57,69 @@ struct ceph_vxattr { static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci) { - size_t s; - char *p = (char *)&ci->i_layout; - - for (s = 0; s < sizeof(ci->i_layout); s++, p++) - if (*p) - return true; - return false; + struct ceph_file_layout *fl = &ci->i_layout; + return (fl->stripe_unit > 0 || fl->stripe_count > 0 || + fl->object_size > 0 || fl->pool_id >= 0 || + rcu_dereference_raw(fl->pool_ns) != NULL); } static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val, size_t size) { - int ret; struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb); struct ceph_osd_client *osdc = &fsc->client->osdc; + struct ceph_string *pool_ns; s64 pool = ci->i_layout.pool_id; const char *pool_name; + const char *ns_field = " pool_namespace="; char buf[128]; + size_t len, total_len = 0; + int ret; + + pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); down_read(&osdc->lock); pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool); if (pool_name) { - size_t len = strlen(pool_name); - ret = snprintf(buf, sizeof(buf), + len = snprintf(buf, sizeof(buf), "stripe_unit=%u stripe_count=%u object_size=%u pool=", ci->i_layout.stripe_unit, ci->i_layout.stripe_count, ci->i_layout.object_size); - if (!size) { - ret += len; - } else if (ret + len > size) { - ret = -ERANGE; - } else { - memcpy(val, buf, ret); - memcpy(val + ret, pool_name, len); - ret += len; - } + total_len = len + strlen(pool_name); } else { - ret = snprintf(buf, sizeof(buf), + len = snprintf(buf, sizeof(buf), "stripe_unit=%u stripe_count=%u object_size=%u pool=%lld", ci->i_layout.stripe_unit, ci->i_layout.stripe_count, ci->i_layout.object_size, (unsigned long long)pool); - if (size) { - if (ret <= size) - memcpy(val, buf, ret); - else - ret = -ERANGE; + total_len = len; + } + + if (pool_ns) + total_len += strlen(ns_field) + pool_ns->len; + + if (!size) { + ret = total_len; + } else if (total_len > size) { + ret = -ERANGE; + } else { + memcpy(val, buf, len); + ret = len; + if (pool_name) { + len = strlen(pool_name); + memcpy(val + ret, pool_name, len); + ret += len; + } + if (pool_ns) { + len = strlen(ns_field); + memcpy(val + ret, ns_field, len); + ret += len; + memcpy(val + ret, pool_ns->str, pool_ns->len); + ret += pool_ns->len; } } up_read(&osdc->lock); + ceph_put_string(pool_ns); return ret; } @@ -147,6 +160,18 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci, return ret; } +static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci, + char *val, size_t size) +{ + int ret = 0; + struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns); + if (ns) { + ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str); + ceph_put_string(ns); + } + return ret; +} + /* directories */ static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val, @@ -235,6 +260,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { XATTR_LAYOUT_FIELD(dir, layout, stripe_count), XATTR_LAYOUT_FIELD(dir, layout, object_size), XATTR_LAYOUT_FIELD(dir, layout, pool), + XATTR_LAYOUT_FIELD(dir, layout, pool_namespace), XATTR_NAME_CEPH(dir, entries), XATTR_NAME_CEPH(dir, files), XATTR_NAME_CEPH(dir, subdirs), @@ -262,6 +288,7 @@ static struct ceph_vxattr ceph_file_vxattrs[] = { XATTR_LAYOUT_FIELD(file, layout, stripe_count), XATTR_LAYOUT_FIELD(file, layout, object_size), XATTR_LAYOUT_FIELD(file, layout, pool), + XATTR_LAYOUT_FIELD(file, layout, pool_namespace), { .name = NULL, 0 } /* Required table terminator */ }; static size_t ceph_file_vxattrs_name_size; /* total size of all names */ -- cgit v1.2.3 From 774a6a118c70f8c11fcfe636032b5016ad71a746 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 6 Jun 2016 16:01:39 +0800 Subject: ceph: reduce i_nr_by_mode array size Track usage count for individual fmode bit. This can reduce the array size by half. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 45 ++++++++++++++++++++++++++++++-------------- fs/ceph/inode.c | 2 +- fs/ceph/ioctl.c | 3 +-- fs/ceph/super.h | 7 ++----- include/linux/ceph/ceph_fs.h | 2 +- 5 files changed, 36 insertions(+), 23 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 0a9406a8a794..a08d245f16f5 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -849,12 +849,14 @@ int __ceph_caps_used(struct ceph_inode_info *ci) */ int __ceph_caps_file_wanted(struct ceph_inode_info *ci) { - int want = 0; - int mode; - for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++) - if (ci->i_nr_by_mode[mode]) - want |= ceph_caps_for_mode(mode); - return want; + int i, bits = 0; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (ci->i_nr_by_mode[i]) + bits |= 1 << i; + } + if (bits == 0) + return 0; + return ceph_caps_for_mode(bits >> 1); } /* @@ -3682,6 +3684,16 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) dout("flush_dirty_caps done\n"); } +void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode) +{ + int i; + int bits = (fmode << 1) | 1; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (bits & (1 << i)) + ci->i_nr_by_mode[i]++; + } +} + /* * Drop open file reference. If we were the last open file, * we may need to release capabilities to the MDS (or schedule @@ -3689,15 +3701,20 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) */ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) { - struct inode *inode = &ci->vfs_inode; - int last = 0; - + int i, last = 0; + int bits = (fmode << 1) | 1; spin_lock(&ci->i_ceph_lock); - dout("put_fmode %p fmode %d %d -> %d\n", inode, fmode, - ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1); - BUG_ON(ci->i_nr_by_mode[fmode] == 0); - if (--ci->i_nr_by_mode[fmode] == 0) - last++; + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { + if (bits & (1 << i)) { + BUG_ON(ci->i_nr_by_mode[i] == 0); + if (--ci->i_nr_by_mode[i] == 0) + last++; + } + } + dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n", + &ci->vfs_inode, fmode, + ci->i_nr_by_mode[0], ci->i_nr_by_mode[1], + ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]); spin_unlock(&ci->i_ceph_lock); if (last && ci->i_vino.snap == CEPH_NOSNAP) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index dc032566ed71..8ca843371d4b 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -477,7 +477,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_head_snapc = NULL; ci->i_snap_caps = 0; - for (i = 0; i < CEPH_FILE_MODE_NUM; i++) + for (i = 0; i < CEPH_FILE_MODE_BITS; i++) ci->i_nr_by_mode[i] = 0; mutex_init(&ci->i_truncate_mutex); diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 6a30101b55ef..7d752d53353a 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -250,9 +250,8 @@ static long ceph_ioctl_lazyio(struct file *file) if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) { spin_lock(&ci->i_ceph_lock); - ci->i_nr_by_mode[fi->fmode]--; fi->fmode |= CEPH_FILE_MODE_LAZY; - ci->i_nr_by_mode[fi->fmode]++; + ci->i_nr_by_mode[ffs(CEPH_FILE_MODE_LAZY)]++; spin_unlock(&ci->i_ceph_lock); dout("ioctl_layzio: file %p marked lazy\n", file); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 7ceab18c8ee2..50846e6f6a8c 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -321,7 +321,7 @@ struct ceph_inode_info { dirty|flushing caps */ unsigned i_snap_caps; /* cap bits for snapped files */ - int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ + int i_nr_by_mode[CEPH_FILE_MODE_BITS]; /* open file counts */ struct mutex i_truncate_mutex; u32 i_truncate_seq; /* last truncate to smaller size */ @@ -906,10 +906,7 @@ extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page); /* for counting open files by mode */ -static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode) -{ - ci->i_nr_by_mode[mode]++; -} +extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode); extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode); /* addr.c */ diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 08b8fd72261e..6bc7730d22ac 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -525,7 +525,7 @@ struct ceph_filelock { #define CEPH_FILE_MODE_WR 2 #define CEPH_FILE_MODE_RDWR 3 /* RD | WR */ #define CEPH_FILE_MODE_LAZY 4 /* lazy io */ -#define CEPH_FILE_MODE_NUM 8 /* bc these are bit fields.. mostly */ +#define CEPH_FILE_MODE_BITS 4 int ceph_flags_to_mode(int flags); -- cgit v1.2.3 From 9a5530c63889ac928a45c4645ab0bc23b4fbfcb8 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 15 Jun 2016 16:29:18 +0800 Subject: ceph: wait unsafe sync writes for evicting inode Otherwise ceph_sync_write_unsafe() may access/modify freed inode. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 50 ++------------------------------------------------ fs/ceph/file.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ fs/ceph/inode.c | 8 ++++++++ fs/ceph/super.c | 1 + fs/ceph/super.h | 2 ++ 5 files changed, 61 insertions(+), 48 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index a08d245f16f5..1e48377f18a7 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1927,53 +1927,6 @@ static int caps_are_flushed(struct inode *inode, u64 flush_tid) return ret; } -/* - * Wait on any unsafe replies for the given inode. First wait on the - * newest request, and make that the upper bound. Then, if there are - * more requests, keep waiting on the oldest as long as it is still older - * than the original request. - */ -static void sync_write_wait(struct inode *inode) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_writes; - struct ceph_osd_request *req; - u64 last_tid; - - if (!S_ISREG(inode->i_mode)) - return; - - spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - /* set upper bound as _last_ entry in chain */ - req = list_last_entry(head, struct ceph_osd_request, - r_unsafe_item); - last_tid = req->r_tid; - - do { - ceph_osdc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); - dout("sync_write_wait on tid %llu (until %llu)\n", - req->r_tid, last_tid); - wait_for_completion(&req->r_safe_completion); - spin_lock(&ci->i_unsafe_lock); - ceph_osdc_put_request(req); - - /* - * from here on look at first entry in chain, since we - * only want to wait for anything older than last_tid - */ - if (list_empty(head)) - break; - req = list_first_entry(head, struct ceph_osd_request, - r_unsafe_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); -} - /* * wait for any unsafe requests to complete. */ @@ -2026,7 +1979,8 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) int dirty; dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); - sync_write_wait(inode); + + ceph_sync_write_wait(inode); ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret < 0) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 033e88753875..7f2ef262cdf7 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -821,6 +821,54 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } } +/* + * Wait on any unsafe replies for the given inode. First wait on the + * newest request, and make that the upper bound. Then, if there are + * more requests, keep waiting on the oldest as long as it is still older + * than the original request. + */ +void ceph_sync_write_wait(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct list_head *head = &ci->i_unsafe_writes; + struct ceph_osd_request *req; + u64 last_tid; + + if (!S_ISREG(inode->i_mode)) + return; + + spin_lock(&ci->i_unsafe_lock); + if (list_empty(head)) + goto out; + + /* set upper bound as _last_ entry in chain */ + + req = list_last_entry(head, struct ceph_osd_request, + r_unsafe_item); + last_tid = req->r_tid; + + do { + ceph_osdc_get_request(req); + spin_unlock(&ci->i_unsafe_lock); + + dout("sync_write_wait on tid %llu (until %llu)\n", + req->r_tid, last_tid); + wait_for_completion(&req->r_safe_completion); + ceph_osdc_put_request(req); + + spin_lock(&ci->i_unsafe_lock); + /* + * from here on look at first entry in chain, since we + * only want to wait for anything older than last_tid + */ + if (list_empty(head)) + break; + req = list_first_entry(head, struct ceph_osd_request, + r_unsafe_item); + } while (req->r_tid < last_tid); +out: + spin_unlock(&ci->i_unsafe_lock); +} static ssize_t ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 8ca843371d4b..6e16269277bd 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -585,6 +585,14 @@ int ceph_drop_inode(struct inode *inode) return 1; } +void ceph_evict_inode(struct inode *inode) +{ + /* wait unsafe sync writes */ + ceph_sync_write_wait(inode); + truncate_inode_pages_final(&inode->i_data); + clear_inode(inode); +} + static inline blkcnt_t calc_inode_blocks(u64 size) { return (size + (1<<9) - 1) >> 9; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 91e02481ce06..a5b2275e1573 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -731,6 +731,7 @@ static const struct super_operations ceph_super_ops = { .destroy_inode = ceph_destroy_inode, .write_inode = ceph_write_inode, .drop_inode = ceph_drop_inode, + .evict_inode = ceph_evict_inode, .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, .show_options = ceph_show_options, diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 50846e6f6a8c..d5b9077467a4 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -749,6 +749,7 @@ extern const struct inode_operations ceph_file_iops; extern struct inode *ceph_alloc_inode(struct super_block *sb); extern void ceph_destroy_inode(struct inode *inode); extern int ceph_drop_inode(struct inode *inode); +extern void ceph_evict_inode(struct inode *inode); extern struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino); @@ -927,6 +928,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, extern int ceph_release(struct inode *inode, struct file *filp); extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, char *data, size_t len); +extern void ceph_sync_write_wait(struct inode *inode); /* dir.c */ extern const struct file_operations ceph_dir_fops; extern const struct file_operations ceph_snapdir_fops; -- cgit v1.2.3 From 9b16f03c474d05b16cbd9eed1ec335c6e71cb57b Mon Sep 17 00:00:00 2001 From: Miklos Szeredi Date: Wed, 22 Jun 2016 16:35:04 +0200 Subject: ceph: don't use ->d_time Pretty simple: just use ceph_dentry_info.time instead (which was already there, unused). Signed-off-by: Miklos Szeredi --- fs/ceph/dir.c | 6 +++--- fs/ceph/inode.c | 4 ++-- fs/ceph/mds_client.c | 4 ++-- fs/ceph/super.h | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 6e0fedf6713b..8ff7bcc7fc88 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -59,7 +59,7 @@ int ceph_init_dentry(struct dentry *dentry) di->dentry = dentry; di->lease_session = NULL; - dentry->d_time = jiffies; + di->time = jiffies; /* avoid reordering d_fsdata setup so that the check above is safe */ smp_mb(); dentry->d_fsdata = di; @@ -1124,7 +1124,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, void ceph_invalidate_dentry_lease(struct dentry *dentry) { spin_lock(&dentry->d_lock); - dentry->d_time = jiffies; + ceph_dentry(dentry)->time = jiffies; ceph_dentry(dentry)->lease_shared_gen = 0; spin_unlock(&dentry->d_lock); } @@ -1154,7 +1154,7 @@ static int dentry_lease_is_valid(struct dentry *dentry) spin_unlock(&s->s_gen_ttl_lock); if (di->lease_gen == gen && - time_before(jiffies, dentry->d_time) && + time_before(jiffies, di->time) && time_before(jiffies, ttl)) { valid = 1; if (di->lease_renew_after && diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 6e16269277bd..a38b768bc158 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1042,7 +1042,7 @@ static void update_dentry_lease(struct dentry *dentry, goto out_unlock; if (di->lease_gen == session->s_cap_gen && - time_before(ttl, dentry->d_time)) + time_before(ttl, di->time)) goto out_unlock; /* we already have a newer lease. */ if (di->lease_session && di->lease_session != session) @@ -1056,7 +1056,7 @@ static void update_dentry_lease(struct dentry *dentry, di->lease_seq = le32_to_cpu(lease->seq); di->lease_renew_after = half_ttl; di->lease_renew_from = 0; - dentry->d_time = ttl; + di->time = ttl; out_unlock: spin_unlock(&dentry->d_lock); return; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 46641bbc8056..0d4bb24c670a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3226,7 +3226,7 @@ static void handle_lease(struct ceph_mds_client *mdsc, msecs_to_jiffies(le32_to_cpu(h->duration_ms)); di->lease_seq = seq; - dentry->d_time = di->lease_renew_from + duration; + di->time = di->lease_renew_from + duration; di->lease_renew_after = di->lease_renew_from + (duration >> 1); di->lease_renew_from = 0; @@ -3311,7 +3311,7 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, if (!di || !di->lease_session || di->lease_session->s_mds < 0 || di->lease_gen != di->lease_session->s_cap_gen || - !time_before(jiffies, dentry->d_time)) { + !time_before(jiffies, di->time)) { dout("lease_release inode %p dentry %p -- " "no lease\n", inode, dentry); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index d5b9077467a4..5aa3158e8611 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -246,7 +246,7 @@ struct ceph_dentry_info { unsigned long lease_renew_after, lease_renew_from; struct list_head lru; struct dentry *dentry; - u64 time; + unsigned long time; u64 offset; }; -- cgit v1.2.3 From e4500b5e35c213e0f97be7cb69328c0877203a79 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 6 Jul 2016 11:12:56 +0800 Subject: ceph: use list instead of rbtree to track cap flushes We don't have requirement of searching cap flush by TID. In most cases, we just need to know TID of the oldest cap flush. List is ideal for this usage. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 120 +++++++++++++-------------------------------------- fs/ceph/inode.c | 2 +- fs/ceph/mds_client.c | 41 +++++++++--------- fs/ceph/mds_client.h | 2 +- fs/ceph/super.h | 9 ++-- 5 files changed, 56 insertions(+), 118 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 698cf002d3f1..e0efa75a1b98 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1413,52 +1413,6 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, return dirty; } -static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, - struct ceph_cap_flush *cf) -{ - struct rb_node **p = &ci->i_cap_flush_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_cap_flush *other = NULL; - - while (*p) { - parent = *p; - other = rb_entry(parent, struct ceph_cap_flush, i_node); - - if (cf->tid < other->tid) - p = &(*p)->rb_left; - else if (cf->tid > other->tid) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&cf->i_node, parent, p); - rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); -} - -static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, - struct ceph_cap_flush *cf) -{ - struct rb_node **p = &mdsc->cap_flush_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_cap_flush *other = NULL; - - while (*p) { - parent = *p; - other = rb_entry(parent, struct ceph_cap_flush, g_node); - - if (cf->tid < other->tid) - p = &(*p)->rb_left; - else if (cf->tid > other->tid) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&cf->g_node, parent, p); - rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); -} - struct ceph_cap_flush *ceph_alloc_cap_flush(void) { return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); @@ -1472,10 +1426,10 @@ void ceph_free_cap_flush(struct ceph_cap_flush *cf) static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) { - struct rb_node *n = rb_first(&mdsc->cap_flush_tree); - if (n) { + if (!list_empty(&mdsc->cap_flush_list)) { struct ceph_cap_flush *cf = - rb_entry(n, struct ceph_cap_flush, g_node); + list_first_entry(&mdsc->cap_flush_list, + struct ceph_cap_flush, g_list); return cf->tid; } return 0; @@ -1516,7 +1470,7 @@ static int __mark_caps_flushing(struct inode *inode, list_del_init(&ci->i_dirty_item); cf->tid = ++mdsc->last_cap_flush_tid; - __add_cap_flushing_to_mdsc(mdsc, cf); + list_add_tail(&cf->g_list, &mdsc->cap_flush_list); *oldest_flush_tid = __get_oldest_flush_tid(mdsc); if (list_empty(&ci->i_flushing_item)) { @@ -1530,7 +1484,7 @@ static int __mark_caps_flushing(struct inode *inode, } spin_unlock(&mdsc->cap_dirty_lock); - __add_cap_flushing_to_inode(ci, cf); + list_add_tail(&cf->i_list, &ci->i_cap_flush_list); *flush_tid = cf->tid; return flushing; @@ -1890,10 +1844,10 @@ retry: spin_unlock(&ci->i_ceph_lock); } } else { - struct rb_node *n = rb_last(&ci->i_cap_flush_tree); - if (n) { + if (!list_empty(&ci->i_cap_flush_list)) { struct ceph_cap_flush *cf = - rb_entry(n, struct ceph_cap_flush, i_node); + list_last_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); flush_tid = cf->tid; } flushing = ci->i_flushing_caps; @@ -1913,14 +1867,13 @@ out: static int caps_are_flushed(struct inode *inode, u64 flush_tid) { struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_cap_flush *cf; - struct rb_node *n; int ret = 1; spin_lock(&ci->i_ceph_lock); - n = rb_first(&ci->i_cap_flush_tree); - if (n) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (!list_empty(&ci->i_cap_flush_list)) { + struct ceph_cap_flush * cf = + list_first_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); if (cf->tid <= flush_tid) ret = 0; } @@ -2083,7 +2036,6 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; struct ceph_cap_flush *cf; - struct rb_node *n; int delayed = 0; u64 first_tid = 0; u64 oldest_flush_tid; @@ -2092,8 +2044,11 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, oldest_flush_tid = __get_oldest_flush_tid(mdsc); spin_unlock(&mdsc->cap_dirty_lock); - while (true) { - spin_lock(&ci->i_ceph_lock); + spin_lock(&ci->i_ceph_lock); + list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { + if (cf->tid < first_tid) + continue; + cap = ci->i_auth_cap; if (!(cap && cap->session == session)) { pr_err("%p auth cap %p not mds%d ???\n", inode, @@ -2102,18 +2057,6 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, break; } - for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); - if (cf->tid >= first_tid) - break; - } - if (!n) { - spin_unlock(&ci->i_ceph_lock); - break; - } - - cf = rb_entry(n, struct ceph_cap_flush, i_node); - first_tid = cf->tid + 1; dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, @@ -2123,7 +2066,10 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, __ceph_caps_wanted(ci), cap->issued | cap->implemented, cf->caps, cf->tid, oldest_flush_tid); + + spin_lock(&ci->i_ceph_lock); } + spin_unlock(&ci->i_ceph_lock); return delayed; } @@ -2995,23 +2941,19 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; - struct ceph_cap_flush *cf; - struct rb_node *n; + struct ceph_cap_flush *cf, *tmp_cf; LIST_HEAD(to_remove); unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; int drop = 0; - n = rb_first(&ci->i_cap_flush_tree); - while (n) { - cf = rb_entry(n, struct ceph_cap_flush, i_node); - n = rb_next(&cf->i_node); + list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { if (cf->tid == flush_tid) cleaned = cf->caps; if (cf->tid <= flush_tid) { - rb_erase(&cf->i_node, &ci->i_cap_flush_tree); - list_add_tail(&cf->list, &to_remove); + list_del(&cf->i_list); + list_add_tail(&cf->i_list, &to_remove); } else { cleaned &= ~cf->caps; if (!cleaned) @@ -3033,12 +2975,12 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, spin_lock(&mdsc->cap_dirty_lock); if (!list_empty(&to_remove)) { - list_for_each_entry(cf, &to_remove, list) - rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + u64 oldest_flush_tid; + list_for_each_entry(cf, &to_remove, i_list) + list_del(&cf->g_list); - n = rb_first(&mdsc->cap_flush_tree); - cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; - if (!cf || cf->tid > flush_tid) + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + if (oldest_flush_tid == 0 || oldest_flush_tid > flush_tid) wake_up_all(&mdsc->cap_flushing_wq); } @@ -3075,8 +3017,8 @@ out: while (!list_empty(&to_remove)) { cf = list_first_entry(&to_remove, - struct ceph_cap_flush, list); - list_del(&cf->list); + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); ceph_free_cap_flush(cf); } if (drop) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index a38b768bc158..fd85b3c58960 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -468,7 +468,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); ci->i_prealloc_cap_flush = NULL; - ci->i_cap_flush_tree = RB_ROOT; + INIT_LIST_HEAD(&ci->i_cap_flush_list); init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index bcf20344d904..7cd6b861c2f3 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1148,19 +1148,17 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) invalidate = true; - while (true) { - struct rb_node *n = rb_first(&ci->i_cap_flush_tree); - if (!n) - break; - cf = rb_entry(n, struct ceph_cap_flush, i_node); - rb_erase(&cf->i_node, &ci->i_cap_flush_tree); - list_add(&cf->list, &to_remove); + while (!list_empty(&ci->i_cap_flush_list)) { + cf = list_first_entry(&ci->i_cap_flush_list, + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); + list_add(&cf->i_list, &to_remove); } spin_lock(&mdsc->cap_dirty_lock); - list_for_each_entry(cf, &to_remove, list) - rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + list_for_each_entry(cf, &to_remove, i_list) + list_del(&cf->g_list); if (!list_empty(&ci->i_dirty_item)) { pr_warn_ratelimited( @@ -1184,7 +1182,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_unlock(&mdsc->cap_dirty_lock); if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { - list_add(&ci->i_prealloc_cap_flush->list, &to_remove); + list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); ci->i_prealloc_cap_flush = NULL; } } @@ -1192,8 +1190,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, while (!list_empty(&to_remove)) { struct ceph_cap_flush *cf; cf = list_first_entry(&to_remove, - struct ceph_cap_flush, list); - list_del(&cf->list); + struct ceph_cap_flush, i_list); + list_del(&cf->i_list); ceph_free_cap_flush(cf); } @@ -1499,17 +1497,18 @@ static int check_capsnap_flush(struct ceph_inode_info *ci, static int check_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_tid) { - struct rb_node *n; - struct ceph_cap_flush *cf; int ret = 1; spin_lock(&mdsc->cap_dirty_lock); - n = rb_first(&mdsc->cap_flush_tree); - cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; - if (cf && cf->tid <= want_flush_tid) { - dout("check_caps_flush still flushing tid %llu <= %llu\n", - cf->tid, want_flush_tid); - ret = 0; + if (!list_empty(&mdsc->cap_flush_list)) { + struct ceph_cap_flush *cf = + list_first_entry(&mdsc->cap_flush_list, + struct ceph_cap_flush, g_list); + if (cf->tid <= want_flush_tid) { + dout("check_caps_flush still flushing tid " + "%llu <= %llu\n", cf->tid, want_flush_tid); + ret = 0; + } } spin_unlock(&mdsc->cap_dirty_lock); return ret; @@ -3470,7 +3469,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); mdsc->last_cap_flush_tid = 1; - mdsc->cap_flush_tree = RB_ROOT; + INIT_LIST_HEAD(&mdsc->cap_flush_list); INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 3c154b8d49bf..93170b4b5d75 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -325,7 +325,7 @@ struct ceph_mds_client { spinlock_t snap_flush_lock; u64 last_cap_flush_tid; - struct rb_root cap_flush_tree; + struct list_head cap_flush_list; struct list_head cap_dirty; /* inodes with dirty caps */ struct list_head cap_dirty_migrating; /* ...that are migration... */ int num_cap_flushing; /* # caps we are flushing */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 9e82e29f86a1..29e8b7bd9413 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -189,11 +189,8 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) struct ceph_cap_flush { u64 tid; int caps; - struct rb_node g_node; // global - union { - struct rb_node i_node; // inode - struct list_head list; - }; + struct list_head g_list; // global + struct list_head i_list; // per inode }; /* @@ -310,7 +307,7 @@ struct ceph_inode_info { * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ struct ceph_cap_flush *i_prealloc_cap_flush; - struct rb_root i_cap_flush_tree; + struct list_head i_cap_flush_list; wait_queue_head_t i_cap_wq; /* threads waiting on a capability */ unsigned long i_hold_caps_min; /* jiffies */ unsigned long i_hold_caps_max; /* jiffies */ -- cgit v1.2.3