diff options
author | Alex Elder <elder@inktank.com> | 2013-04-03 21:32:51 -0500 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-05-01 21:18:12 -0700 |
commit | 79528734f3ae4699a2886f62f55e18fb34fb3651 (patch) | |
tree | 51905378486b592fc2d4037d67ef3b577fe4eaa7 /net/ceph | |
parent | 430c28c3cb7f3dbd87de266ed52d65928957ff78 (diff) | |
download | blackbird-op-linux-79528734f3ae4699a2886f62f55e18fb34fb3651.tar.gz blackbird-op-linux-79528734f3ae4699a2886f62f55e18fb34fb3651.zip |
libceph: keep source rather than message osd op array
An osd request keeps a pointer to the osd operations (ops) array
that it builds in its request message.
In order to allow each op in the array to have its own distinct
data, we will need to keep track of each op's data, and that
information does not go over the wire.
As long as we're tracking the data we might as well just track the
entire (source) op definition for each of the ops. And if we're
doing that, we'll have no more need to keep a pointer to the
wire-encoded version.
This patch makes the array of source ops be kept with the osd
request structure, and uses that instead of the version encoded in
the message in places where that was previously used. The array
will be embedded in the request structure, and the maximum number of
ops we ever actually use is currently 2. So reduce CEPH_OSD_MAX_OP
to 2 to reduce the size of the structure.
The result of doing this sort of ripples back up, and as a result
various function parameters and local variables become unnecessary.
Make r_num_ops be unsigned, and move the definition of struct
ceph_osd_req_op earlier to ensure it's defined where needed.
It does not yet add per-op data, that's coming soon.
This resolves:
http://tracker.ceph.com/issues/4656
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net/ceph')
-rw-r--r-- | net/ceph/debugfs.c | 4 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 53 |
2 files changed, 29 insertions, 28 deletions
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 00d051f4894e..83661cdc0766 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -123,8 +123,8 @@ static int osdc_show(struct seq_file *s, void *pp) mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { struct ceph_osd_request *req; + unsigned int i; int opcode; - int i; req = rb_entry(p, struct ceph_osd_request, r_node); @@ -142,7 +142,7 @@ static int osdc_show(struct seq_file *s, void *pp) seq_printf(s, "\t"); for (i = 0; i < req->r_num_ops; i++) { - opcode = le16_to_cpu(req->r_request_ops[i].op); + opcode = req->r_ops[i].op; seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e197c5c0b3a2..a498d2de17a4 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -186,6 +186,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_msg *msg; size_t msg_size; + BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX); + BUG_ON(num_ops > CEPH_OSD_MAX_OP); + msg_size = 4 + 4 + 8 + 8 + 4+8; msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pg_t */ @@ -207,6 +210,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, req->r_osdc = osdc; req->r_mempool = use_mempool; + req->r_num_ops = num_ops; kref_init(&req->r_kref); init_completion(&req->r_completion); @@ -418,12 +422,14 @@ void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, EXPORT_SYMBOL(osd_req_op_watch_init); static u64 osd_req_encode_op(struct ceph_osd_request *req, - struct ceph_osd_op *dst, - struct ceph_osd_req_op *src) + struct ceph_osd_op *dst, unsigned int which) { + struct ceph_osd_req_op *src; u64 out_data_len = 0; struct ceph_pagelist *pagelist; + BUG_ON(which >= req->r_num_ops); + src = &req->r_ops[which]; if (WARN_ON(!osd_req_opcode_valid(src->op))) { pr_err("unrecognized osd opcode %d\n", src->op); @@ -487,21 +493,17 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, * build new request AND message * */ -void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, unsigned int num_ops, - struct ceph_osd_req_op *src_ops, - struct ceph_snap_context *snapc, u64 snap_id, - struct timespec *mtime) +void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, + struct ceph_snap_context *snapc, u64 snap_id, + struct timespec *mtime) { struct ceph_msg *msg = req->r_request; - struct ceph_osd_req_op *src_op; void *p; size_t msg_size; int flags = req->r_flags; u64 data_len; - int i; + unsigned int i; - req->r_num_ops = num_ops; req->r_snapid = snap_id; req->r_snapc = ceph_get_snap_context(snapc); @@ -541,12 +543,10 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, p += req->r_oid_len; /* ops--can imply data */ - ceph_encode_16(&p, num_ops); - src_op = src_ops; - req->r_request_ops = p; + ceph_encode_16(&p, (u16)req->r_num_ops); data_len = 0; - for (i = 0; i < num_ops; i++, src_op++) { - data_len += osd_req_encode_op(req, p, src_op); + for (i = 0; i < req->r_num_ops; i++) { + data_len += osd_req_encode_op(req, p, i); p += sizeof(struct ceph_osd_op); } @@ -602,7 +602,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, struct ceph_vino vino, u64 off, u64 *plen, int num_ops, - struct ceph_osd_req_op *ops, int opcode, int flags, struct ceph_snap_context *snapc, u32 truncate_seq, @@ -610,6 +609,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, bool use_mempool) { struct ceph_osd_request *req; + struct ceph_osd_req_op *op; u64 objnum = 0; u64 objoff = 0; u64 objlen = 0; @@ -623,6 +623,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); + req->r_flags = flags; /* calculate max write size */ @@ -642,7 +643,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size = object_size; } - osd_req_op_extent_init(&ops[0], opcode, objoff, objlen, + op = &req->r_ops[0]; + osd_req_op_extent_init(op, opcode, objoff, objlen, truncate_size, truncate_seq); /* * A second op in the ops array means the caller wants to @@ -650,7 +652,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, * osd will flush data quickly. */ if (num_ops > 1) - osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC); + osd_req_op_init(++op, CEPH_OSD_OP_STARTSYNC); req->r_file_layout = *layout; /* keep a copy */ @@ -1342,7 +1344,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, struct ceph_osd_request *req; u64 tid; int object_len; - int numops, payload_len, flags; + unsigned int numops; + int payload_len, flags; s32 result; s32 retry_attempt; struct ceph_pg pg; @@ -1352,7 +1355,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, u32 osdmap_epoch; int already_completed; u32 bytes; - int i; + unsigned int i; tid = le64_to_cpu(msg->hdr.tid); dout("handle_reply %p tid %llu\n", msg, tid); @@ -2116,12 +2119,11 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct page **pages, int num_pages, int page_align) { struct ceph_osd_request *req; - struct ceph_osd_req_op op; int rc = 0; dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, vino.snap, off, *plen); - req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, &op, + req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, truncate_seq, truncate_size, false); @@ -2136,7 +2138,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", off, *plen, *plen, page_align); - ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); + ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); rc = ceph_osdc_start_request(osdc, req, false); if (!rc) @@ -2160,12 +2162,11 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct page **pages, int num_pages) { struct ceph_osd_request *req; - struct ceph_osd_req_op op; int rc = 0; int page_align = off & ~PAGE_MASK; BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ - req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, &op, + req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, snapc, truncate_seq, truncate_size, @@ -2178,7 +2179,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, false, false); dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); - ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime); + ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); rc = ceph_osdc_start_request(osdc, req, true); if (!rc) |