summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/linux/ceph/osd_client.h5
-rw-r--r--net/ceph/osd_client.c255
2 files changed, 125 insertions, 135 deletions
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index a1af29648fb5..e791b8e46353 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -74,7 +74,6 @@ struct ceph_osd_request {
char r_oid[40]; /* object name */
int r_oid_len;
unsigned long r_stamp; /* send OR check time */
- bool r_resend; /* msg send failed, needs retry */
struct ceph_file_layout r_file_layout;
struct ceph_snap_context *r_snapc; /* snap context for writes */
@@ -104,7 +103,9 @@ struct ceph_osd_client {
u64 timeout_tid; /* tid of timeout triggering rq */
u64 last_tid; /* tid of last request */
struct rb_root requests; /* pending requests */
- struct list_head req_lru; /* pending requests lru */
+ struct list_head req_lru; /* in-flight lru */
+ struct list_head req_unsent; /* unsent/need-resend queue */
+ struct list_head req_notarget; /* map to no osd */
int num_requests;
struct delayed_work timeout_work;
struct delayed_work osds_timeout_work;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3e20a122ffa2..b85ed5a5503d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -22,10 +22,9 @@
#define OSD_OPREPLY_FRONT_LEN 512
static const struct ceph_connection_operations osd_con_ops;
-static int __kick_requests(struct ceph_osd_client *osdc,
- struct ceph_osd *kickosd);
-static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void send_queued(struct ceph_osd_client *osdc);
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
static int op_needs_trail(int op)
{
@@ -529,6 +528,35 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
return NULL;
}
+/*
+ * Resubmit requests pending on the given osd.
+ */
+static void __kick_osd_requests(struct ceph_osd_client *osdc,
+ struct ceph_osd *osd)
+{
+ struct ceph_osd_request *req;
+ int err;
+
+ dout("__kick_osd_requests osd%d\n", osd->o_osd);
+ err = __reset_osd(osdc, osd);
+ if (err == -EAGAIN)
+ return;
+
+ list_for_each_entry(req, &osd->o_requests, r_osd_item) {
+ list_move(&req->r_req_lru_item, &osdc->req_unsent);
+ dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
+ osd->o_osd);
+ req->r_flags |= CEPH_OSD_FLAG_RETRY;
+ }
+}
+
+static void kick_osd_requests(struct ceph_osd_client *osdc,
+ struct ceph_osd *kickosd)
+{
+ mutex_lock(&osdc->request_mutex);
+ __kick_osd_requests(osdc, kickosd);
+ mutex_unlock(&osdc->request_mutex);
+}
/*
* If the osd connection drops, we need to resubmit all requests.
@@ -543,7 +571,8 @@ static void osd_reset(struct ceph_connection *con)
dout("osd_reset osd%d\n", osd->o_osd);
osdc = osd->o_osdc;
down_read(&osdc->map_sem);
- kick_requests(osdc, osd);
+ kick_osd_requests(osdc, osd);
+ send_queued(osdc);
up_read(&osdc->map_sem);
}
@@ -781,20 +810,20 @@ static void __cancel_request(struct ceph_osd_request *req)
ceph_con_revoke(&req->r_osd->o_con, req->r_request);
req->r_sent = 0;
}
- list_del_init(&req->r_req_lru_item);
}
/*
* Pick an osd (the first 'up' osd in the pg), allocate the osd struct
* (as needed), and set the request r_osd appropriately. If there is
- * no up osd, set r_osd to NULL.
+ * no up osd, set r_osd to NULL. Move the request to the appropiate list
+ * (unsent, homeless) or leave on in-flight lru.
*
* Return 0 if unchanged, 1 if changed, or negative on error.
*
* Caller should hold map_sem for read and request_mutex.
*/
-static int __map_osds(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static int __map_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
struct ceph_pg pgid;
@@ -802,11 +831,13 @@ static int __map_osds(struct ceph_osd_client *osdc,
int o = -1, num = 0;
int err;
- dout("map_osds %p tid %lld\n", req, req->r_tid);
+ dout("map_request %p tid %lld\n", req, req->r_tid);
err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
&req->r_file_layout, osdc->osdmap);
- if (err)
+ if (err) {
+ list_move(&req->r_req_lru_item, &osdc->req_notarget);
return err;
+ }
pgid = reqhead->layout.ol_pgid;
req->r_pgid = pgid;
@@ -823,7 +854,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
(req->r_osd == NULL && o == -1))
return 0; /* no change */
- dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
+ dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
req->r_osd ? req->r_osd->o_osd : -1);
@@ -841,10 +872,12 @@ static int __map_osds(struct ceph_osd_client *osdc,
if (!req->r_osd && o >= 0) {
err = -ENOMEM;
req->r_osd = create_osd(osdc);
- if (!req->r_osd)
+ if (!req->r_osd) {
+ list_move(&req->r_req_lru_item, &osdc->req_notarget);
goto out;
+ }
- dout("map_osds osd %p is osd%d\n", req->r_osd, o);
+ dout("map_request osd %p is osd%d\n", req->r_osd, o);
req->r_osd->o_osd = o;
req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
__insert_osd(osdc, req->r_osd);
@@ -855,6 +888,9 @@ static int __map_osds(struct ceph_osd_client *osdc,
if (req->r_osd) {
__remove_osd_from_lru(req->r_osd);
list_add(&req->r_osd_item, &req->r_osd->o_requests);
+ list_move(&req->r_req_lru_item, &osdc->req_unsent);
+ } else {
+ list_move(&req->r_req_lru_item, &osdc->req_notarget);
}
err = 1; /* osd or pg changed */
@@ -869,16 +905,6 @@ static int __send_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead;
- int err;
-
- err = __map_osds(osdc, req);
- if (err < 0)
- return err;
- if (req->r_osd == NULL) {
- dout("send_request %p no up osds in pg\n", req);
- ceph_monc_request_next_osdmap(&osdc->client->monc);
- return 0;
- }
dout("send_request %p tid %llu to osd%d flags %d\n",
req, req->r_tid, req->r_osd->o_osd, req->r_flags);
@@ -898,6 +924,21 @@ static int __send_request(struct ceph_osd_client *osdc,
}
/*
+ * Send any requests in the queue (req_unsent).
+ */
+static void send_queued(struct ceph_osd_client *osdc)
+{
+ struct ceph_osd_request *req, *tmp;
+
+ dout("send_queued\n");
+ mutex_lock(&osdc->request_mutex);
+ list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+ __send_request(osdc, req);
+ }
+ mutex_unlock(&osdc->request_mutex);
+}
+
+/*
* Timeout callback, called every N seconds when 1 or more osd
* requests has been active for more than N seconds. When this
* happens, we ping all OSDs with requests who have timed out to
@@ -916,7 +957,6 @@ static void handle_timeout(struct work_struct *work)
unsigned long keepalive =
osdc->client->options->osd_keepalive_timeout * HZ;
unsigned long last_stamp = 0;
- struct rb_node *p;
struct list_head slow_osds;
dout("timeout\n");
@@ -925,21 +965,6 @@ static void handle_timeout(struct work_struct *work)
ceph_monc_request_next_osdmap(&osdc->client->monc);
mutex_lock(&osdc->request_mutex);
- for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
- req = rb_entry(p, struct ceph_osd_request, r_node);
-
- if (req->r_resend) {
- int err;
-
- dout("osdc resending prev failed %lld\n", req->r_tid);
- err = __send_request(osdc, req);
- if (err)
- dout("osdc failed again on %lld\n", req->r_tid);
- else
- req->r_resend = false;
- continue;
- }
- }
/*
* reset osds that appear to be _really_ unresponsive. this
@@ -963,7 +988,7 @@ static void handle_timeout(struct work_struct *work)
BUG_ON(!osd);
pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
req->r_tid, osd->o_osd);
- __kick_requests(osdc, osd);
+ __kick_osd_requests(osdc, osd);
}
/*
@@ -991,7 +1016,7 @@ static void handle_timeout(struct work_struct *work)
__schedule_osd_timeout(osdc);
mutex_unlock(&osdc->request_mutex);
-
+ send_queued(osdc);
up_read(&osdc->map_sem);
}
@@ -1109,108 +1134,61 @@ bad:
ceph_msg_dump(msg);
}
-
-static int __kick_requests(struct ceph_osd_client *osdc,
- struct ceph_osd *kickosd)
+static void reset_changed_osds(struct ceph_osd_client *osdc)
{
- struct ceph_osd_request *req;
struct rb_node *p, *n;
- int needmap = 0;
- int err;
-
- dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
- if (kickosd) {
- err = __reset_osd(osdc, kickosd);
- if (err == -EAGAIN)
- return 1;
- } else {
- for (p = rb_first(&osdc->osds); p; p = n) {
- struct ceph_osd *osd =
- rb_entry(p, struct ceph_osd, o_node);
-
- n = rb_next(p);
- if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
- memcmp(&osd->o_con.peer_addr,
- ceph_osd_addr(osdc->osdmap,
- osd->o_osd),
- sizeof(struct ceph_entity_addr)) != 0)
- __reset_osd(osdc, osd);
- }
- }
-
- for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
- req = rb_entry(p, struct ceph_osd_request, r_node);
-
- if (req->r_resend) {
- dout(" r_resend set on tid %llu\n", req->r_tid);
- __cancel_request(req);
- goto kick;
- }
- if (req->r_osd && kickosd == req->r_osd) {
- __cancel_request(req);
- goto kick;
- }
- err = __map_osds(osdc, req);
- if (err == 0)
- continue; /* no change */
- if (err < 0) {
- /*
- * FIXME: really, we should set the request
- * error and fail if this isn't a 'nofail'
- * request, but that's a fair bit more
- * complicated to do. So retry!
- */
- dout(" setting r_resend on %llu\n", req->r_tid);
- req->r_resend = true;
- continue;
- }
- if (req->r_osd == NULL) {
- dout("tid %llu maps to no valid osd\n", req->r_tid);
- needmap++; /* request a newer map */
- continue;
- }
+ for (p = rb_first(&osdc->osds); p; p = n) {
+ struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
-kick:
- dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
- req->r_osd ? req->r_osd->o_osd : -1);
- req->r_flags |= CEPH_OSD_FLAG_RETRY;
- err = __send_request(osdc, req);
- if (err) {
- dout(" setting r_resend on %llu\n", req->r_tid);
- req->r_resend = true;
- }
+ n = rb_next(p);
+ if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
+ memcmp(&osd->o_con.peer_addr,
+ ceph_osd_addr(osdc->osdmap,
+ osd->o_osd),
+ sizeof(struct ceph_entity_addr)) != 0)
+ __reset_osd(osdc, osd);
}
-
- return needmap;
}
/*
- * Resubmit osd requests whose osd or osd address has changed. Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
+ * Requeue requests whose mapping to an OSD has changed. If requests map to
+ * no osd, request a new map.
*
* Caller should hold map_sem for read and request_mutex.
*/
-static void kick_requests(struct ceph_osd_client *osdc,
- struct ceph_osd *kickosd)
+static void kick_requests(struct ceph_osd_client *osdc)
{
- int needmap;
+ struct ceph_osd_request *req;
+ struct rb_node *p;
+ int needmap = 0;
+ int err;
+ dout("kick_requests\n");
mutex_lock(&osdc->request_mutex);
- needmap = __kick_requests(osdc, kickosd);
+ for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+ req = rb_entry(p, struct ceph_osd_request, r_node);
+ err = __map_request(osdc, req);
+ if (err < 0)
+ continue; /* error */
+ if (req->r_osd == NULL) {
+ dout("%p tid %llu maps to no osd\n", req, req->r_tid);
+ needmap++; /* request a newer map */
+ } else if (err > 0) {
+ dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
+ req->r_osd ? req->r_osd->o_osd : -1);
+ req->r_flags |= CEPH_OSD_FLAG_RETRY;
+ }
+ }
mutex_unlock(&osdc->request_mutex);
if (needmap) {
dout("%d requests for down osds, need new map\n", needmap);
ceph_monc_request_next_osdmap(&osdc->client->monc);
}
-
}
+
+
/*
* Process updated osd map.
*
@@ -1263,6 +1241,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = newmap;
}
+ kick_requests(osdc);
+ reset_changed_osds(osdc);
} else {
dout("ignoring incremental map %u len %d\n",
epoch, maplen);
@@ -1300,6 +1280,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
osdc->osdmap = newmap;
if (oldmap)
ceph_osdmap_destroy(oldmap);
+ kick_requests(osdc);
}
p += maplen;
nr_maps--;
@@ -1308,8 +1289,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
done:
downgrade_write(&osdc->map_sem);
ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
- if (newmap)
- kick_requests(osdc, NULL);
+ send_queued(osdc);
up_read(&osdc->map_sem);
wake_up_all(&osdc->client->auth_wq);
return;
@@ -1347,15 +1327,22 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
* the request still han't been touched yet.
*/
if (req->r_sent == 0) {
- rc = __send_request(osdc, req);
- if (rc) {
- if (nofail) {
- dout("osdc_start_request failed send, "
- " marking %lld\n", req->r_tid);
- req->r_resend = true;
- rc = 0;
- } else {
- __unregister_request(osdc, req);
+ rc = __map_request(osdc, req);
+ if (rc < 0)
+ return rc;
+ if (req->r_osd == NULL) {
+ dout("send_request %p no up osds in pg\n", req);
+ ceph_monc_request_next_osdmap(&osdc->client->monc);
+ } else {
+ rc = __send_request(osdc, req);
+ if (rc) {
+ if (nofail) {
+ dout("osdc_start_request failed send, "
+ " will retry %lld\n", req->r_tid);
+ rc = 0;
+ } else {
+ __unregister_request(osdc, req);
+ }
}
}
}
@@ -1441,6 +1428,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT;
INIT_LIST_HEAD(&osdc->req_lru);
+ INIT_LIST_HEAD(&osdc->req_unsent);
+ INIT_LIST_HEAD(&osdc->req_notarget);
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
OpenPOWER on IntegriCloud