summaryrefslogtreecommitdiffstats
path: root/net/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r--net/ceph/osd_client.c283
1 files changed, 255 insertions, 28 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2b4b32aaa893..010ff3bd58ad 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
msg_size = 4 + 4 + 8 + 8 + 4+8;
msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
msg_size += 1 + 8 + 4 + 4; /* pg_t */
- msg_size += 4 + MAX_OBJ_NAME_SIZE;
+ msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
msg_size += 8; /* snapid */
msg_size += 8; /* snap_seq */
@@ -368,6 +368,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_req_lru_item);
INIT_LIST_HEAD(&req->r_osd_item);
+ req->r_base_oloc.pool = -1;
+ req->r_target_oloc.pool = -1;
+
/* create reply message */
if (use_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -761,11 +764,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
if (num_ops > 1)
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
- req->r_file_layout = *layout; /* keep a copy */
+ req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
- snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
- vino.ino, objnum);
- req->r_oid_len = strlen(req->r_oid);
+ snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
+ "%llx.%08llx", vino.ino, objnum);
+ req->r_base_oid.name_len = strlen(req->r_base_oid.name);
return req;
}
@@ -1044,8 +1047,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
!ceph_con_opened(&osd->o_con)) {
struct ceph_osd_request *req;
- dout(" osd addr hasn't changed and connection never opened,"
- " letting msgr retry");
+ dout("osd addr hasn't changed and connection never opened, "
+ "letting msgr retry\n");
/* touch each r_stamp for handle_timeout()'s benfit */
list_for_each_entry(req, &osd->o_requests, r_osd_item)
req->r_stamp = jiffies;
@@ -1232,6 +1235,61 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
EXPORT_SYMBOL(ceph_osdc_set_request_linger);
/*
+ * Returns whether a request should be blocked from being sent
+ * based on the current osdmap and osd_client settings.
+ *
+ * Caller should hold map_sem for read.
+ */
+static bool __req_should_be_paused(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
+{
+ bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+ bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+ return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
+ (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
+}
+
+/*
+ * Calculate mapping of a request to a PG. Takes tiering into account.
+ */
+static int __calc_request_pg(struct ceph_osdmap *osdmap,
+ struct ceph_osd_request *req,
+ struct ceph_pg *pg_out)
+{
+ bool need_check_tiering;
+
+ need_check_tiering = false;
+ if (req->r_target_oloc.pool == -1) {
+ req->r_target_oloc = req->r_base_oloc; /* struct */
+ need_check_tiering = true;
+ }
+ if (req->r_target_oid.name_len == 0) {
+ ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
+ need_check_tiering = true;
+ }
+
+ if (need_check_tiering &&
+ (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+ struct ceph_pg_pool_info *pi;
+
+ pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
+ if (pi) {
+ if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
+ pi->read_tier >= 0)
+ req->r_target_oloc.pool = pi->read_tier;
+ if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+ pi->write_tier >= 0)
+ req->r_target_oloc.pool = pi->write_tier;
+ }
+ /* !pi is caught in ceph_oloc_oid_to_pg() */
+ }
+
+ return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
+ &req->r_target_oid, pg_out);
+}
+
+/*
* Pick an osd (the first 'up' osd in the pg), allocate the osd struct
* (as needed), and set the request r_osd appropriately. If there is
* no up osd, set r_osd to NULL. Move the request to the appropriate list
@@ -1248,10 +1306,11 @@ static int __map_request(struct ceph_osd_client *osdc,
int acting[CEPH_PG_MAX_SIZE];
int o = -1, num = 0;
int err;
+ bool was_paused;
dout("map_request %p tid %lld\n", req, req->r_tid);
- err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
- ceph_file_layout_pg_pool(req->r_file_layout));
+
+ err = __calc_request_pg(osdc->osdmap, req, &pgid);
if (err) {
list_move(&req->r_req_lru_item, &osdc->req_notarget);
return err;
@@ -1264,12 +1323,18 @@ static int __map_request(struct ceph_osd_client *osdc,
num = err;
}
+ was_paused = req->r_paused;
+ req->r_paused = __req_should_be_paused(osdc, req);
+ if (was_paused && !req->r_paused)
+ force_resend = 1;
+
if ((!force_resend &&
req->r_osd && req->r_osd->o_osd == o &&
req->r_sent >= req->r_osd->o_incarnation &&
req->r_num_pg_osds == num &&
memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
- (req->r_osd == NULL && o == -1))
+ (req->r_osd == NULL && o == -1) ||
+ req->r_paused)
return 0; /* no change */
dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
@@ -1331,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
/* fill in message content that changes each time we send it */
put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
put_unaligned_le32(req->r_flags, req->r_request_flags);
- put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
+ put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
p = req->r_request_pgid;
ceph_encode_64(&p, req->r_pgid.pool);
ceph_encode_32(&p, req->r_pgid.seed);
@@ -1432,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
round_jiffies_relative(delay));
}
+static int ceph_oloc_decode(void **p, void *end,
+ struct ceph_object_locator *oloc)
+{
+ u8 struct_v, struct_cv;
+ u32 len;
+ void *struct_end;
+ int ret = 0;
+
+ ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+ struct_v = ceph_decode_8(p);
+ struct_cv = ceph_decode_8(p);
+ if (struct_v < 3) {
+ pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
+ struct_v, struct_cv);
+ goto e_inval;
+ }
+ if (struct_cv > 6) {
+ pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
+ struct_v, struct_cv);
+ goto e_inval;
+ }
+ len = ceph_decode_32(p);
+ ceph_decode_need(p, end, len, e_inval);
+ struct_end = *p + len;
+
+ oloc->pool = ceph_decode_64(p);
+ *p += 4; /* skip preferred */
+
+ len = ceph_decode_32(p);
+ if (len > 0) {
+ pr_warn("ceph_object_locator::key is set\n");
+ goto e_inval;
+ }
+
+ if (struct_v >= 5) {
+ len = ceph_decode_32(p);
+ if (len > 0) {
+ pr_warn("ceph_object_locator::nspace is set\n");
+ goto e_inval;
+ }
+ }
+
+ if (struct_v >= 6) {
+ s64 hash = ceph_decode_64(p);
+ if (hash != -1) {
+ pr_warn("ceph_object_locator::hash is set\n");
+ goto e_inval;
+ }
+ }
+
+ /* skip the rest */
+ *p = struct_end;
+out:
+ return ret;
+
+e_inval:
+ ret = -EINVAL;
+ goto out;
+}
+
+static int ceph_redirect_decode(void **p, void *end,
+ struct ceph_request_redirect *redir)
+{
+ u8 struct_v, struct_cv;
+ u32 len;
+ void *struct_end;
+ int ret;
+
+ ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+ struct_v = ceph_decode_8(p);
+ struct_cv = ceph_decode_8(p);
+ if (struct_cv > 1) {
+ pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
+ struct_v, struct_cv);
+ goto e_inval;
+ }
+ len = ceph_decode_32(p);
+ ceph_decode_need(p, end, len, e_inval);
+ struct_end = *p + len;
+
+ ret = ceph_oloc_decode(p, end, &redir->oloc);
+ if (ret)
+ goto out;
+
+ len = ceph_decode_32(p);
+ if (len > 0) {
+ pr_warn("ceph_request_redirect::object_name is set\n");
+ goto e_inval;
+ }
+
+ len = ceph_decode_32(p);
+ *p += len; /* skip osd_instructions */
+
+ /* skip the rest */
+ *p = struct_end;
+out:
+ return ret;
+
+e_inval:
+ ret = -EINVAL;
+ goto out;
+}
+
static void complete_request(struct ceph_osd_request *req)
{
complete_all(&req->r_safe_completion); /* fsync waiter */
@@ -1446,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
{
void *p, *end;
struct ceph_osd_request *req;
+ struct ceph_request_redirect redir;
u64 tid;
int object_len;
unsigned int numops;
@@ -1525,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
for (i = 0; i < numops; i++)
req->r_reply_op_result[i] = ceph_decode_32(&p);
- already_completed = req->r_got_reply;
+ if (le16_to_cpu(msg->hdr.version) >= 6) {
+ p += 8 + 4; /* skip replay_version */
+ p += 8; /* skip user_version */
- if (!req->r_got_reply) {
+ err = ceph_redirect_decode(&p, end, &redir);
+ if (err)
+ goto bad_put;
+ } else {
+ redir.oloc.pool = -1;
+ }
+
+ if (redir.oloc.pool != -1) {
+ dout("redirect pool %lld\n", redir.oloc.pool);
+
+ __unregister_request(osdc, req);
+ mutex_unlock(&osdc->request_mutex);
+
+ req->r_target_oloc = redir.oloc; /* struct */
+
+ /*
+ * Start redirect requests with nofail=true. If
+ * mapping fails, request will end up on the notarget
+ * list, waiting for the new osdmap (which can take
+ * a while), even though the original request mapped
+ * successfully. In the future we might want to follow
+ * original request's nofail setting here.
+ */
+ err = ceph_osdc_start_request(osdc, req, true);
+ BUG_ON(err);
+ goto done;
+ }
+
+ already_completed = req->r_got_reply;
+ if (!req->r_got_reply) {
req->r_result = result;
dout("handle_reply result %d bytes %d\n", req->r_result,
bytes);
@@ -1581,6 +1781,13 @@ done:
return;
bad_put:
+ req->r_result = -EIO;
+ __unregister_request(osdc, req);
+ if (req->r_callback)
+ req->r_callback(req, msg);
+ else
+ complete_all(&req->r_completion);
+ complete_request(req);
ceph_osdc_put_request(req);
bad_mutex:
mutex_unlock(&osdc->request_mutex);
@@ -1613,14 +1820,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
*
* Caller should hold map_sem for read.
*/
-static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
+static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
+ bool force_resend_writes)
{
struct ceph_osd_request *req, *nreq;
struct rb_node *p;
int needmap = 0;
int err;
+ bool force_resend_req;
- dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
+ dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
+ force_resend_writes ? " (force resend writes)" : "");
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; ) {
req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -1645,7 +1855,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
continue;
}
- err = __map_request(osdc, req, force_resend);
+ force_resend_req = force_resend ||
+ (force_resend_writes &&
+ req->r_flags & CEPH_OSD_FLAG_WRITE);
+ err = __map_request(osdc, req, force_resend_req);
if (err < 0)
continue; /* error */
if (req->r_osd == NULL) {
@@ -1665,7 +1878,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
r_linger_item) {
dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
- err = __map_request(osdc, req, force_resend);
+ err = __map_request(osdc, req,
+ force_resend || force_resend_writes);
dout("__map_request returned %d\n", err);
if (err == 0)
continue; /* no change and no osd was specified */
@@ -1707,6 +1921,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
struct ceph_osdmap *newmap = NULL, *oldmap;
int err;
struct ceph_fsid fsid;
+ bool was_full;
dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
p = msg->front.iov_base;
@@ -1720,6 +1935,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
down_write(&osdc->map_sem);
+ was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+
/* incremental maps */
ceph_decode_32_safe(&p, end, nr_maps, bad);
dout(" %d inc maps\n", nr_maps);
@@ -1744,7 +1961,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap;
}
- kick_requests(osdc, 0);
+ was_full = was_full ||
+ ceph_osdmap_flag(osdc->osdmap,
+ CEPH_OSDMAP_FULL);
+ kick_requests(osdc, 0, was_full);
} else {
dout("ignoring incremental map %u len %d\n",
epoch, maplen);
@@ -1787,7 +2007,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
skipped_map = 1;
ceph_osdmap_destroy(oldmap);
}
- kick_requests(osdc, skipped_map);
+ was_full = was_full ||
+ ceph_osdmap_flag(osdc->osdmap,
+ CEPH_OSDMAP_FULL);
+ kick_requests(osdc, skipped_map, was_full);
}
p += maplen;
nr_maps--;
@@ -1804,7 +2027,9 @@ done:
* we find out when we are no longer full and stop returning
* ENOSPC.
*/
- if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
+ if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
+ ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
ceph_monc_request_next_osdmap(&osdc->client->monc);
mutex_lock(&osdc->request_mutex);
@@ -2068,10 +2293,11 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
ceph_encode_32(&p, -1); /* preferred */
/* oid */
- ceph_encode_32(&p, req->r_oid_len);
- memcpy(p, req->r_oid, req->r_oid_len);
- dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
- p += req->r_oid_len;
+ ceph_encode_32(&p, req->r_base_oid.name_len);
+ memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len);
+ dout("oid '%.*s' len %d\n", req->r_base_oid.name_len,
+ req->r_base_oid.name, req->r_base_oid.name_len);
+ p += req->r_base_oid.name_len;
/* ops--can imply data */
ceph_encode_16(&p, (u16)req->r_num_ops);
@@ -2454,7 +2680,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
struct ceph_osd_client *osdc = osd->o_osdc;
struct ceph_msg *m;
struct ceph_osd_request *req;
- int front = le32_to_cpu(hdr->front_len);
+ int front_len = le32_to_cpu(hdr->front_len);
int data_len = le32_to_cpu(hdr->data_len);
u64 tid;
@@ -2474,12 +2700,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
req->r_reply, req->r_reply->con);
ceph_msg_revoke_incoming(req->r_reply);
- if (front > req->r_reply->front.iov_len) {
+ if (front_len > req->r_reply->front_alloc_len) {
pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
- front, (int)req->r_reply->front.iov_len,
+ front_len, req->r_reply->front_alloc_len,
(unsigned int)con->peer_name.type,
le64_to_cpu(con->peer_name.num));
- m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
+ m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
+ false);
if (!m)
goto out;
ceph_msg_put(req->r_reply);
OpenPOWER on IntegriCloud