diff options
Diffstat (limited to 'net/ceph')
-rw-r--r-- | net/ceph/ceph_common.c | 4 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 33 | ||||
-rw-r--r-- | net/ceph/crypto.c | 101 | ||||
-rw-r--r-- | net/ceph/debugfs.c | 17 | ||||
-rw-r--r-- | net/ceph/messenger.c | 44 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 457 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 123 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 19 | ||||
-rw-r--r-- | net/ceph/pagevec.c | 2 |
9 files changed, 464 insertions, 336 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index bcbec33c6a14..dcc18c6f7cf9 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -361,7 +361,6 @@ ceph_parse_options(char *options, const char *dev_name, opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; - opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT; /* get mon ip(s) */ /* ip1[:port1][,ip2[:port2]...] */ @@ -686,6 +685,9 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started) return client->auth_err; } + pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid); + ceph_debugfs_client_init(client); + return 0; } EXPORT_SYMBOL(__ceph_open_session); diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 393bfb22d5bb..5fcfb98f309e 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -403,6 +403,7 @@ static int is_out(const struct crush_map *map, * @local_retries: localized retries * @local_fallback_retries: localized fallback retries * @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose) + * @stable: stable mode starts rep=0 in the recursive call for all replicas * @vary_r: pass r to recursive calls * @out2: second output vector for leaf items (if @recurse_to_leaf) * @parent_r: r value passed from the parent @@ -419,6 +420,7 @@ static int crush_choose_firstn(const struct crush_map *map, unsigned int local_fallback_retries, int recurse_to_leaf, unsigned int vary_r, + unsigned int stable, int *out2, int parent_r) { @@ -433,13 +435,13 @@ static int crush_choose_firstn(const struct crush_map *map, int collide, reject; int count = out_size; - dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d\n", + dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d recurse_tries %d local_retries %d local_fallback_retries %d parent_r %d stable %d\n", recurse_to_leaf ? "_LEAF" : "", bucket->id, x, outpos, numrep, tries, recurse_tries, local_retries, local_fallback_retries, - parent_r); + parent_r, stable); - for (rep = outpos; rep < numrep && count > 0 ; rep++) { + for (rep = stable ? 0 : outpos; rep < numrep && count > 0 ; rep++) { /* keep trying until we get a non-out, non-colliding item */ ftotal = 0; skip_rep = 0; @@ -512,13 +514,14 @@ static int crush_choose_firstn(const struct crush_map *map, if (crush_choose_firstn(map, map->buckets[-1-item], weight, weight_max, - x, outpos+1, 0, + x, stable ? 1 : outpos+1, 0, out2, outpos, count, recurse_tries, 0, local_retries, local_fallback_retries, 0, vary_r, + stable, NULL, sub_r) <= outpos) /* didn't get leaf */ @@ -816,6 +819,7 @@ int crush_do_rule(const struct crush_map *map, int choose_local_fallback_retries = map->choose_local_fallback_tries; int vary_r = map->chooseleaf_vary_r; + int stable = map->chooseleaf_stable; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -835,7 +839,8 @@ int crush_do_rule(const struct crush_map *map, case CRUSH_RULE_TAKE: if ((curstep->arg1 >= 0 && curstep->arg1 < map->max_devices) || - (-1-curstep->arg1 < map->max_buckets && + (-1-curstep->arg1 >= 0 && + -1-curstep->arg1 < map->max_buckets && map->buckets[-1-curstep->arg1])) { w[0] = curstep->arg1; wsize = 1; @@ -869,6 +874,11 @@ int crush_do_rule(const struct crush_map *map, vary_r = curstep->arg1; break; + case CRUSH_RULE_SET_CHOOSELEAF_STABLE: + if (curstep->arg1 >= 0) + stable = curstep->arg1; + break; + case CRUSH_RULE_CHOOSELEAF_FIRSTN: case CRUSH_RULE_CHOOSE_FIRSTN: firstn = 1; @@ -888,6 +898,7 @@ int crush_do_rule(const struct crush_map *map, osize = 0; for (i = 0; i < wsize; i++) { + int bno; /* * see CRUSH_N, CRUSH_N_MINUS macros. * basically, numrep <= 0 means relative to @@ -900,6 +911,13 @@ int crush_do_rule(const struct crush_map *map, continue; } j = 0; + /* make sure bucket id is valid */ + bno = -1 - w[i]; + if (bno < 0 || bno >= map->max_buckets) { + /* w[i] is probably CRUSH_ITEM_NONE */ + dprintk(" bad w[i] %d\n", w[i]); + continue; + } if (firstn) { int recurse_tries; if (choose_leaf_tries) @@ -911,7 +929,7 @@ int crush_do_rule(const struct crush_map *map, recurse_tries = choose_tries; osize += crush_choose_firstn( map, - map->buckets[-1-w[i]], + map->buckets[bno], weight, weight_max, x, numrep, curstep->arg2, @@ -923,6 +941,7 @@ int crush_do_rule(const struct crush_map *map, choose_local_fallback_retries, recurse_to_leaf, vary_r, + stable, c+osize, 0); } else { @@ -930,7 +949,7 @@ int crush_do_rule(const struct crush_map *map, numrep : (result_max-osize)); crush_choose_indep( map, - map->buckets[-1-w[i]], + map->buckets[bno], weight, weight_max, x, out_size, numrep, curstep->arg2, diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index 42e8649c6e79..db2847ac5f12 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -4,7 +4,8 @@ #include <linux/err.h> #include <linux/scatterlist.h> #include <linux/slab.h> -#include <crypto/hash.h> +#include <crypto/aes.h> +#include <crypto/skcipher.h> #include <linux/key-type.h> #include <keys/ceph-type.h> @@ -79,9 +80,9 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey) return 0; } -static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void) +static struct crypto_skcipher *ceph_crypto_alloc_cipher(void) { - return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC); + return crypto_alloc_skcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC); } static const u8 *aes_iv = (u8 *)CEPH_AES_IV; @@ -162,11 +163,10 @@ static int ceph_aes_encrypt(const void *key, int key_len, { struct scatterlist sg_in[2], prealloc_sg; struct sg_table sg_out; - struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); - struct blkcipher_desc desc = { .tfm = tfm, .flags = 0 }; + struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher(); + SKCIPHER_REQUEST_ON_STACK(req, tfm); int ret; - void *iv; - int ivsize; + char iv[AES_BLOCK_SIZE]; size_t zero_padding = (0x10 - (src_len & 0x0f)); char pad[16]; @@ -184,10 +184,13 @@ static int ceph_aes_encrypt(const void *key, int key_len, if (ret) goto out_tfm; - crypto_blkcipher_setkey((void *)tfm, key, key_len); - iv = crypto_blkcipher_crt(tfm)->iv; - ivsize = crypto_blkcipher_ivsize(tfm); - memcpy(iv, aes_iv, ivsize); + crypto_skcipher_setkey((void *)tfm, key, key_len); + memcpy(iv, aes_iv, AES_BLOCK_SIZE); + + skcipher_request_set_tfm(req, tfm); + skcipher_request_set_callback(req, 0, NULL, NULL); + skcipher_request_set_crypt(req, sg_in, sg_out.sgl, + src_len + zero_padding, iv); /* print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1, @@ -197,8 +200,8 @@ static int ceph_aes_encrypt(const void *key, int key_len, print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1, pad, zero_padding, 1); */ - ret = crypto_blkcipher_encrypt(&desc, sg_out.sgl, sg_in, - src_len + zero_padding); + ret = crypto_skcipher_encrypt(req); + skcipher_request_zero(req); if (ret < 0) { pr_err("ceph_aes_crypt failed %d\n", ret); goto out_sg; @@ -211,7 +214,7 @@ static int ceph_aes_encrypt(const void *key, int key_len, out_sg: teardown_sgtable(&sg_out); out_tfm: - crypto_free_blkcipher(tfm); + crypto_free_skcipher(tfm); return ret; } @@ -222,11 +225,10 @@ static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, { struct scatterlist sg_in[3], prealloc_sg; struct sg_table sg_out; - struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); - struct blkcipher_desc desc = { .tfm = tfm, .flags = 0 }; + struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher(); + SKCIPHER_REQUEST_ON_STACK(req, tfm); int ret; - void *iv; - int ivsize; + char iv[AES_BLOCK_SIZE]; size_t zero_padding = (0x10 - ((src1_len + src2_len) & 0x0f)); char pad[16]; @@ -245,10 +247,13 @@ static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, if (ret) goto out_tfm; - crypto_blkcipher_setkey((void *)tfm, key, key_len); - iv = crypto_blkcipher_crt(tfm)->iv; - ivsize = crypto_blkcipher_ivsize(tfm); - memcpy(iv, aes_iv, ivsize); + crypto_skcipher_setkey((void *)tfm, key, key_len); + memcpy(iv, aes_iv, AES_BLOCK_SIZE); + + skcipher_request_set_tfm(req, tfm); + skcipher_request_set_callback(req, 0, NULL, NULL); + skcipher_request_set_crypt(req, sg_in, sg_out.sgl, + src1_len + src2_len + zero_padding, iv); /* print_hex_dump(KERN_ERR, "enc key: ", DUMP_PREFIX_NONE, 16, 1, @@ -260,8 +265,8 @@ static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, print_hex_dump(KERN_ERR, "enc pad: ", DUMP_PREFIX_NONE, 16, 1, pad, zero_padding, 1); */ - ret = crypto_blkcipher_encrypt(&desc, sg_out.sgl, sg_in, - src1_len + src2_len + zero_padding); + ret = crypto_skcipher_encrypt(req); + skcipher_request_zero(req); if (ret < 0) { pr_err("ceph_aes_crypt2 failed %d\n", ret); goto out_sg; @@ -274,7 +279,7 @@ static int ceph_aes_encrypt2(const void *key, int key_len, void *dst, out_sg: teardown_sgtable(&sg_out); out_tfm: - crypto_free_blkcipher(tfm); + crypto_free_skcipher(tfm); return ret; } @@ -284,11 +289,10 @@ static int ceph_aes_decrypt(const void *key, int key_len, { struct sg_table sg_in; struct scatterlist sg_out[2], prealloc_sg; - struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); - struct blkcipher_desc desc = { .tfm = tfm }; + struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher(); + SKCIPHER_REQUEST_ON_STACK(req, tfm); char pad[16]; - void *iv; - int ivsize; + char iv[AES_BLOCK_SIZE]; int ret; int last_byte; @@ -302,10 +306,13 @@ static int ceph_aes_decrypt(const void *key, int key_len, if (ret) goto out_tfm; - crypto_blkcipher_setkey((void *)tfm, key, key_len); - iv = crypto_blkcipher_crt(tfm)->iv; - ivsize = crypto_blkcipher_ivsize(tfm); - memcpy(iv, aes_iv, ivsize); + crypto_skcipher_setkey((void *)tfm, key, key_len); + memcpy(iv, aes_iv, AES_BLOCK_SIZE); + + skcipher_request_set_tfm(req, tfm); + skcipher_request_set_callback(req, 0, NULL, NULL); + skcipher_request_set_crypt(req, sg_in.sgl, sg_out, + src_len, iv); /* print_hex_dump(KERN_ERR, "dec key: ", DUMP_PREFIX_NONE, 16, 1, @@ -313,7 +320,8 @@ static int ceph_aes_decrypt(const void *key, int key_len, print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1, src, src_len, 1); */ - ret = crypto_blkcipher_decrypt(&desc, sg_out, sg_in.sgl, src_len); + ret = crypto_skcipher_decrypt(req); + skcipher_request_zero(req); if (ret < 0) { pr_err("ceph_aes_decrypt failed %d\n", ret); goto out_sg; @@ -338,7 +346,7 @@ static int ceph_aes_decrypt(const void *key, int key_len, out_sg: teardown_sgtable(&sg_in); out_tfm: - crypto_free_blkcipher(tfm); + crypto_free_skcipher(tfm); return ret; } @@ -349,11 +357,10 @@ static int ceph_aes_decrypt2(const void *key, int key_len, { struct sg_table sg_in; struct scatterlist sg_out[3], prealloc_sg; - struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher(); - struct blkcipher_desc desc = { .tfm = tfm }; + struct crypto_skcipher *tfm = ceph_crypto_alloc_cipher(); + SKCIPHER_REQUEST_ON_STACK(req, tfm); char pad[16]; - void *iv; - int ivsize; + char iv[AES_BLOCK_SIZE]; int ret; int last_byte; @@ -368,10 +375,13 @@ static int ceph_aes_decrypt2(const void *key, int key_len, if (ret) goto out_tfm; - crypto_blkcipher_setkey((void *)tfm, key, key_len); - iv = crypto_blkcipher_crt(tfm)->iv; - ivsize = crypto_blkcipher_ivsize(tfm); - memcpy(iv, aes_iv, ivsize); + crypto_skcipher_setkey((void *)tfm, key, key_len); + memcpy(iv, aes_iv, AES_BLOCK_SIZE); + + skcipher_request_set_tfm(req, tfm); + skcipher_request_set_callback(req, 0, NULL, NULL); + skcipher_request_set_crypt(req, sg_in.sgl, sg_out, + src_len, iv); /* print_hex_dump(KERN_ERR, "dec key: ", DUMP_PREFIX_NONE, 16, 1, @@ -379,7 +389,8 @@ static int ceph_aes_decrypt2(const void *key, int key_len, print_hex_dump(KERN_ERR, "dec in: ", DUMP_PREFIX_NONE, 16, 1, src, src_len, 1); */ - ret = crypto_blkcipher_decrypt(&desc, sg_out, sg_in.sgl, src_len); + ret = crypto_skcipher_decrypt(req); + skcipher_request_zero(req); if (ret < 0) { pr_err("ceph_aes_decrypt failed %d\n", ret); goto out_sg; @@ -415,7 +426,7 @@ static int ceph_aes_decrypt2(const void *key, int key_len, out_sg: teardown_sgtable(&sg_in); out_tfm: - crypto_free_blkcipher(tfm); + crypto_free_skcipher(tfm); return ret; } diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 593dc2eabcc8..b902fbc7863e 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -112,15 +112,20 @@ static int monc_show(struct seq_file *s, void *p) struct ceph_mon_generic_request *req; struct ceph_mon_client *monc = &client->monc; struct rb_node *rp; + int i; mutex_lock(&monc->mutex); - if (monc->have_mdsmap) - seq_printf(s, "have mdsmap %u\n", (unsigned int)monc->have_mdsmap); - if (monc->have_osdmap) - seq_printf(s, "have osdmap %u\n", (unsigned int)monc->have_osdmap); - if (monc->want_next_osdmap) - seq_printf(s, "want next osdmap\n"); + for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { + seq_printf(s, "have %s %u", ceph_sub_str[i], + monc->subs[i].have); + if (monc->subs[i].want) + seq_printf(s, " want %llu%s", + le64_to_cpu(monc->subs[i].item.start), + (monc->subs[i].item.flags & + CEPH_SUBSCRIBE_ONETIME ? "" : "+")); + seq_putc(s, '\n'); + } for (rp = rb_first(&monc->generic_request_tree); rp; rp = rb_next(rp)) { __u16 op; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 9cfedf565f5b..1831f6353622 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -235,18 +235,12 @@ static struct workqueue_struct *ceph_msgr_wq; static int ceph_msgr_slab_init(void) { BUG_ON(ceph_msg_cache); - ceph_msg_cache = kmem_cache_create("ceph_msg", - sizeof (struct ceph_msg), - __alignof__(struct ceph_msg), 0, NULL); - + ceph_msg_cache = KMEM_CACHE(ceph_msg, 0); if (!ceph_msg_cache) return -ENOMEM; BUG_ON(ceph_msg_data_cache); - ceph_msg_data_cache = kmem_cache_create("ceph_msg_data", - sizeof (struct ceph_msg_data), - __alignof__(struct ceph_msg_data), - 0, NULL); + ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0); if (ceph_msg_data_cache) return 0; @@ -1197,6 +1191,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, return new_piece; } +static size_t sizeof_footer(struct ceph_connection *con) +{ + return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? + sizeof(struct ceph_msg_footer) : + sizeof(struct ceph_msg_footer_old); +} + static void prepare_message_data(struct ceph_msg *msg, u32 data_len) { BUG_ON(!msg); @@ -1214,25 +1215,19 @@ static void prepare_message_data(struct ceph_msg *msg, u32 data_len) static void prepare_write_message_footer(struct ceph_connection *con) { struct ceph_msg *m = con->out_msg; - int v = con->out_kvec_left; m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; dout("prepare_write_message_footer %p\n", con); - con->out_kvec[v].iov_base = &m->footer; + con_out_kvec_add(con, sizeof_footer(con), &m->footer); if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { if (con->ops->sign_message) con->ops->sign_message(m); else m->footer.sig = 0; - con->out_kvec[v].iov_len = sizeof(m->footer); - con->out_kvec_bytes += sizeof(m->footer); } else { m->old_footer.flags = m->footer.flags; - con->out_kvec[v].iov_len = sizeof(m->old_footer); - con->out_kvec_bytes += sizeof(m->old_footer); } - con->out_kvec_left++; con->out_more = m->more_to_follow; con->out_msg_done = true; } @@ -2335,9 +2330,9 @@ static int read_partial_message(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr), seq, con->in_seq + 1); con->in_base_pos = -front_len - middle_len - data_len - - sizeof(m->footer); + sizeof_footer(con); con->in_tag = CEPH_MSGR_TAG_READY; - return 0; + return 1; } else if ((s64)seq - (s64)con->in_seq > 1) { pr_err("read_partial_message bad seq %lld expected %lld\n", seq, con->in_seq + 1); @@ -2360,10 +2355,10 @@ static int read_partial_message(struct ceph_connection *con) /* skip this message */ dout("alloc_msg said skip message\n"); con->in_base_pos = -front_len - middle_len - data_len - - sizeof(m->footer); + sizeof_footer(con); con->in_tag = CEPH_MSGR_TAG_READY; con->in_seq++; - return 0; + return 1; } BUG_ON(!con->in_msg); @@ -2402,11 +2397,7 @@ static int read_partial_message(struct ceph_connection *con) } /* footer */ - if (need_sign) - size = sizeof(m->footer); - else - size = sizeof(m->old_footer); - + size = sizeof_footer(con); end += size; ret = read_partial(con, end, size, &m->footer); if (ret <= 0) @@ -3082,10 +3073,7 @@ void ceph_msg_revoke(struct ceph_msg *msg) con->out_skip += con_out_kvec_skip(con); } else { BUG_ON(!msg->data_length); - if (con->peer_features & CEPH_FEATURE_MSG_AUTH) - con->out_skip += sizeof(msg->footer); - else - con->out_skip += sizeof(msg->old_footer); + con->out_skip += sizeof_footer(con); } /* data, middle, front */ if (msg->data_length) diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index de85dddc3dc0..cf638c009cfa 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -122,51 +122,91 @@ static void __close_session(struct ceph_mon_client *monc) ceph_msg_revoke(monc->m_subscribe); ceph_msg_revoke_incoming(monc->m_subscribe_ack); ceph_con_close(&monc->con); - monc->cur_mon = -1; + monc->pending_auth = 0; ceph_auth_reset(monc->auth); } /* - * Open a session with a (new) monitor. + * Pick a new monitor at random and set cur_mon. If we are repicking + * (i.e. cur_mon is already set), be sure to pick a different one. */ -static int __open_session(struct ceph_mon_client *monc) +static void pick_new_mon(struct ceph_mon_client *monc) { - char r; - int ret; + int old_mon = monc->cur_mon; - if (monc->cur_mon < 0) { - get_random_bytes(&r, 1); - monc->cur_mon = r % monc->monmap->num_mon; - dout("open_session num=%d r=%d -> mon%d\n", - monc->monmap->num_mon, r, monc->cur_mon); - monc->sub_sent = 0; - monc->sub_renew_after = jiffies; /* i.e., expired */ - monc->want_next_osdmap = !!monc->want_next_osdmap; - - dout("open_session mon%d opening\n", monc->cur_mon); - ceph_con_open(&monc->con, - CEPH_ENTITY_TYPE_MON, monc->cur_mon, - &monc->monmap->mon_inst[monc->cur_mon].addr); - - /* send an initial keepalive to ensure our timestamp is - * valid by the time we are in an OPENED state */ - ceph_con_keepalive(&monc->con); - - /* initiatiate authentication handshake */ - ret = ceph_auth_build_hello(monc->auth, - monc->m_auth->front.iov_base, - monc->m_auth->front_alloc_len); - __send_prepared_auth_request(monc, ret); + BUG_ON(monc->monmap->num_mon < 1); + + if (monc->monmap->num_mon == 1) { + monc->cur_mon = 0; } else { - dout("open_session mon%d already open\n", monc->cur_mon); + int max = monc->monmap->num_mon; + int o = -1; + int n; + + if (monc->cur_mon >= 0) { + if (monc->cur_mon < monc->monmap->num_mon) + o = monc->cur_mon; + if (o >= 0) + max--; + } + + n = prandom_u32() % max; + if (o >= 0 && n >= o) + n++; + + monc->cur_mon = n; } - return 0; + + dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, + monc->cur_mon, monc->monmap->num_mon); +} + +/* + * Open a session with a new monitor. + */ +static void __open_session(struct ceph_mon_client *monc) +{ + int ret; + + pick_new_mon(monc); + + monc->hunting = true; + if (monc->had_a_connection) { + monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; + if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) + monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; + } + + monc->sub_renew_after = jiffies; /* i.e., expired */ + monc->sub_renew_sent = 0; + + dout("%s opening mon%d\n", __func__, monc->cur_mon); + ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, + &monc->monmap->mon_inst[monc->cur_mon].addr); + + /* + * send an initial keepalive to ensure our timestamp is valid + * by the time we are in an OPENED state + */ + ceph_con_keepalive(&monc->con); + + /* initiate authentication handshake */ + ret = ceph_auth_build_hello(monc->auth, + monc->m_auth->front.iov_base, + monc->m_auth->front_alloc_len); + BUG_ON(ret <= 0); + __send_prepared_auth_request(monc, ret); } -static bool __sub_expired(struct ceph_mon_client *monc) +static void reopen_session(struct ceph_mon_client *monc) { - return time_after_eq(jiffies, monc->sub_renew_after); + if (!monc->hunting) + pr_info("mon%d %s session lost, hunting for new mon\n", + monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr)); + + __close_session(monc); + __open_session(monc); } /* @@ -174,74 +214,70 @@ static bool __sub_expired(struct ceph_mon_client *monc) */ static void __schedule_delayed(struct ceph_mon_client *monc) { - struct ceph_options *opt = monc->client->options; unsigned long delay; - if (monc->cur_mon < 0 || __sub_expired(monc)) { - delay = 10 * HZ; - } else { - delay = 20 * HZ; - if (opt->monc_ping_timeout > 0) - delay = min(delay, opt->monc_ping_timeout / 3); - } + if (monc->hunting) + delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; + else + delay = CEPH_MONC_PING_INTERVAL; + dout("__schedule_delayed after %lu\n", delay); - schedule_delayed_work(&monc->delayed_work, - round_jiffies_relative(delay)); + mod_delayed_work(system_wq, &monc->delayed_work, + round_jiffies_relative(delay)); } +const char *ceph_sub_str[] = { + [CEPH_SUB_MDSMAP] = "mdsmap", + [CEPH_SUB_MONMAP] = "monmap", + [CEPH_SUB_OSDMAP] = "osdmap", +}; + /* - * Send subscribe request for mdsmap and/or osdmap. + * Send subscribe request for one or more maps, according to + * monc->subs. */ static void __send_subscribe(struct ceph_mon_client *monc) { - dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", - (unsigned int)monc->sub_sent, __sub_expired(monc), - monc->want_next_osdmap); - if ((__sub_expired(monc) && !monc->sub_sent) || - monc->want_next_osdmap == 1) { - struct ceph_msg *msg = monc->m_subscribe; - struct ceph_mon_subscribe_item *i; - void *p, *end; - int num; - - p = msg->front.iov_base; - end = p + msg->front_alloc_len; - - num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; - ceph_encode_32(&p, num); - - if (monc->want_next_osdmap) { - dout("__send_subscribe to 'osdmap' %u\n", - (unsigned int)monc->have_osdmap); - ceph_encode_string(&p, end, "osdmap", 6); - i = p; - i->have = cpu_to_le64(monc->have_osdmap); - i->onetime = 1; - p += sizeof(*i); - monc->want_next_osdmap = 2; /* requested */ - } - if (monc->want_mdsmap) { - dout("__send_subscribe to 'mdsmap' %u+\n", - (unsigned int)monc->have_mdsmap); - ceph_encode_string(&p, end, "mdsmap", 6); - i = p; - i->have = cpu_to_le64(monc->have_mdsmap); - i->onetime = 0; - p += sizeof(*i); - } - ceph_encode_string(&p, end, "monmap", 6); - i = p; - i->have = 0; - i->onetime = 0; - p += sizeof(*i); - - msg->front.iov_len = p - msg->front.iov_base; - msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - ceph_msg_revoke(msg); - ceph_con_send(&monc->con, ceph_msg_get(msg)); - - monc->sub_sent = jiffies | 1; /* never 0 */ + struct ceph_msg *msg = monc->m_subscribe; + void *p = msg->front.iov_base; + void *const end = p + msg->front_alloc_len; + int num = 0; + int i; + + dout("%s sent %lu\n", __func__, monc->sub_renew_sent); + + BUG_ON(monc->cur_mon < 0); + + if (!monc->sub_renew_sent) + monc->sub_renew_sent = jiffies | 1; /* never 0 */ + + msg->hdr.version = cpu_to_le16(2); + + for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { + if (monc->subs[i].want) + num++; } + BUG_ON(num < 1); /* monmap sub is always there */ + ceph_encode_32(&p, num); + for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { + const char *s = ceph_sub_str[i]; + + if (!monc->subs[i].want) + continue; + + dout("%s %s start %llu flags 0x%x\n", __func__, s, + le64_to_cpu(monc->subs[i].item.start), + monc->subs[i].item.flags); + ceph_encode_string(&p, end, s, strlen(s)); + memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); + p += sizeof(monc->subs[i].item); + } + + BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19)); + msg->front.iov_len = p - msg->front.iov_base; + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + ceph_msg_revoke(msg); + ceph_con_send(&monc->con, ceph_msg_get(msg)); } static void handle_subscribe_ack(struct ceph_mon_client *monc, @@ -255,15 +291,16 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc, seconds = le32_to_cpu(h->duration); mutex_lock(&monc->mutex); - if (monc->hunting) { - pr_info("mon%d %s session established\n", - monc->cur_mon, - ceph_pr_addr(&monc->con.peer_addr.in_addr)); - monc->hunting = false; + if (monc->sub_renew_sent) { + monc->sub_renew_after = monc->sub_renew_sent + + (seconds >> 1) * HZ - 1; + dout("%s sent %lu duration %d renew after %lu\n", __func__, + monc->sub_renew_sent, seconds, monc->sub_renew_after); + monc->sub_renew_sent = 0; + } else { + dout("%s sent %lu renew after %lu, ignoring\n", __func__, + monc->sub_renew_sent, monc->sub_renew_after); } - dout("handle_subscribe_ack after %d seconds\n", seconds); - monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; - monc->sub_sent = 0; mutex_unlock(&monc->mutex); return; bad: @@ -272,36 +309,82 @@ bad: } /* - * Keep track of which maps we have + * Register interest in a map + * + * @sub: one of CEPH_SUB_* + * @epoch: X for "every map since X", or 0 for "just the latest" */ -int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) +static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, + u32 epoch, bool continuous) +{ + __le64 start = cpu_to_le64(epoch); + u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; + + dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], + epoch, continuous); + + if (monc->subs[sub].want && + monc->subs[sub].item.start == start && + monc->subs[sub].item.flags == flags) + return false; + + monc->subs[sub].item.start = start; + monc->subs[sub].item.flags = flags; + monc->subs[sub].want = true; + + return true; +} + +bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, + bool continuous) { + bool need_request; + mutex_lock(&monc->mutex); - monc->have_mdsmap = got; + need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); mutex_unlock(&monc->mutex); - return 0; + + return need_request; } -EXPORT_SYMBOL(ceph_monc_got_mdsmap); +EXPORT_SYMBOL(ceph_monc_want_map); -int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) +/* + * Keep track of which maps we have + * + * @sub: one of CEPH_SUB_* + */ +static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, + u32 epoch) +{ + dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); + + if (monc->subs[sub].want) { + if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) + monc->subs[sub].want = false; + else + monc->subs[sub].item.start = cpu_to_le64(epoch + 1); + } + + monc->subs[sub].have = epoch; +} + +void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) { mutex_lock(&monc->mutex); - monc->have_osdmap = got; - monc->want_next_osdmap = 0; + __ceph_monc_got_map(monc, sub, epoch); mutex_unlock(&monc->mutex); - return 0; } +EXPORT_SYMBOL(ceph_monc_got_map); /* * Register interest in the next osdmap */ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) { - dout("request_next_osdmap have %u\n", monc->have_osdmap); + dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have); mutex_lock(&monc->mutex); - if (!monc->want_next_osdmap) - monc->want_next_osdmap = 1; - if (monc->want_next_osdmap < 2) + if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, + monc->subs[CEPH_SUB_OSDMAP].have + 1, false)) __send_subscribe(monc); mutex_unlock(&monc->mutex); } @@ -320,15 +403,15 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, long ret; mutex_lock(&monc->mutex); - while (monc->have_osdmap < epoch) { + while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { mutex_unlock(&monc->mutex); if (timeout && time_after_eq(jiffies, started + timeout)) return -ETIMEDOUT; ret = wait_event_interruptible_timeout(monc->client->auth_wq, - monc->have_osdmap >= epoch, - ceph_timeout_jiffies(timeout)); + monc->subs[CEPH_SUB_OSDMAP].have >= epoch, + ceph_timeout_jiffies(timeout)); if (ret < 0) return ret; @@ -341,11 +424,14 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, EXPORT_SYMBOL(ceph_monc_wait_osdmap); /* - * + * Open a session with a random monitor. Request monmap and osdmap, + * which are waited upon in __ceph_open_session(). */ int ceph_monc_open_session(struct ceph_mon_client *monc) { mutex_lock(&monc->mutex); + __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); + __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); __open_session(monc); __schedule_delayed(monc); mutex_unlock(&monc->mutex); @@ -353,29 +439,15 @@ int ceph_monc_open_session(struct ceph_mon_client *monc) } EXPORT_SYMBOL(ceph_monc_open_session); -/* - * We require the fsid and global_id in order to initialize our - * debugfs dir. - */ -static bool have_debugfs_info(struct ceph_mon_client *monc) -{ - dout("have_debugfs_info fsid %d globalid %lld\n", - (int)monc->client->have_fsid, monc->auth->global_id); - return monc->client->have_fsid && monc->auth->global_id > 0; -} - static void ceph_monc_handle_map(struct ceph_mon_client *monc, struct ceph_msg *msg) { struct ceph_client *client = monc->client; struct ceph_monmap *monmap = NULL, *old = monc->monmap; void *p, *end; - int had_debugfs_info, init_debugfs = 0; mutex_lock(&monc->mutex); - had_debugfs_info = have_debugfs_info(monc); - dout("handle_monmap\n"); p = msg->front.iov_base; end = p + msg->front.iov_len; @@ -395,29 +467,11 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, client->monc.monmap = monmap; kfree(old); - if (!client->have_fsid) { - client->have_fsid = true; - if (!had_debugfs_info && have_debugfs_info(monc)) { - pr_info("client%lld fsid %pU\n", - ceph_client_id(monc->client), - &monc->client->fsid); - init_debugfs = 1; - } - mutex_unlock(&monc->mutex); - - if (init_debugfs) { - /* - * do debugfs initialization without mutex to avoid - * creating a locking dependency - */ - ceph_debugfs_client_init(monc->client); - } + __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); + client->have_fsid = true; - goto out_unlocked; - } out: mutex_unlock(&monc->mutex); -out_unlocked: wake_up_all(&client->auth_wq); } @@ -745,18 +799,15 @@ static void delayed_work(struct work_struct *work) dout("monc delayed_work\n"); mutex_lock(&monc->mutex); if (monc->hunting) { - __close_session(monc); - __open_session(monc); /* continue hunting */ + dout("%s continuing hunt\n", __func__); + reopen_session(monc); } else { - struct ceph_options *opt = monc->client->options; int is_auth = ceph_auth_is_authenticated(monc->auth); if (ceph_con_keepalive_expired(&monc->con, - opt->monc_ping_timeout)) { + CEPH_MONC_PING_TIMEOUT)) { dout("monc keepalive timeout\n"); is_auth = 0; - __close_session(monc); - monc->hunting = true; - __open_session(monc); + reopen_session(monc); } if (!monc->hunting) { @@ -764,8 +815,14 @@ static void delayed_work(struct work_struct *work) __validate_auth(monc); } - if (is_auth) - __send_subscribe(monc); + if (is_auth) { + unsigned long now = jiffies; + + dout("%s renew subs? now %lu renew after %lu\n", + __func__, now, monc->sub_renew_after); + if (time_after_eq(now, monc->sub_renew_after)) + __send_subscribe(monc); + } } __schedule_delayed(monc); mutex_unlock(&monc->mutex); @@ -852,18 +909,14 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) &monc->client->msgr); monc->cur_mon = -1; - monc->hunting = true; - monc->sub_renew_after = jiffies; - monc->sub_sent = 0; + monc->had_a_connection = false; + monc->hunt_mult = 1; INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); monc->generic_request_tree = RB_ROOT; monc->num_generic_requests = 0; monc->last_tid = 0; - monc->have_mdsmap = 0; - monc->have_osdmap = 0; - monc->want_next_osdmap = 1; return 0; out_auth_reply: @@ -888,7 +941,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc) mutex_lock(&monc->mutex); __close_session(monc); - + monc->cur_mon = -1; mutex_unlock(&monc->mutex); /* @@ -910,26 +963,40 @@ void ceph_monc_stop(struct ceph_mon_client *monc) } EXPORT_SYMBOL(ceph_monc_stop); +static void finish_hunting(struct ceph_mon_client *monc) +{ + if (monc->hunting) { + dout("%s found mon%d\n", __func__, monc->cur_mon); + monc->hunting = false; + monc->had_a_connection = true; + monc->hunt_mult /= 2; /* reduce by 50% */ + if (monc->hunt_mult < 1) + monc->hunt_mult = 1; + } +} + static void handle_auth_reply(struct ceph_mon_client *monc, struct ceph_msg *msg) { int ret; int was_auth = 0; - int had_debugfs_info, init_debugfs = 0; mutex_lock(&monc->mutex); - had_debugfs_info = have_debugfs_info(monc); was_auth = ceph_auth_is_authenticated(monc->auth); monc->pending_auth = 0; ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, msg->front.iov_len, monc->m_auth->front.iov_base, monc->m_auth->front_alloc_len); + if (ret > 0) { + __send_prepared_auth_request(monc, ret); + goto out; + } + + finish_hunting(monc); + if (ret < 0) { monc->client->auth_err = ret; - wake_up_all(&monc->client->auth_wq); - } else if (ret > 0) { - __send_prepared_auth_request(monc, ret); } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { dout("authenticated, starting session\n"); @@ -939,23 +1006,15 @@ static void handle_auth_reply(struct ceph_mon_client *monc, __send_subscribe(monc); __resend_generic_request(monc); - } - if (!had_debugfs_info && have_debugfs_info(monc)) { - pr_info("client%lld fsid %pU\n", - ceph_client_id(monc->client), - &monc->client->fsid); - init_debugfs = 1; + pr_info("mon%d %s session established\n", monc->cur_mon, + ceph_pr_addr(&monc->con.peer_addr.in_addr)); } - mutex_unlock(&monc->mutex); - if (init_debugfs) { - /* - * do debugfs initialization without mutex to avoid - * creating a locking dependency - */ - ceph_debugfs_client_init(monc->client); - } +out: + mutex_unlock(&monc->mutex); + if (monc->client->auth_err < 0) + wake_up_all(&monc->client->auth_wq); } static int __validate_auth(struct ceph_mon_client *monc) @@ -1096,29 +1155,17 @@ static void mon_fault(struct ceph_connection *con) { struct ceph_mon_client *monc = con->private; - if (!monc) - return; - - dout("mon_fault\n"); mutex_lock(&monc->mutex); - if (!con->private) - goto out; - - if (!monc->hunting) - pr_info("mon%d %s session lost, " - "hunting for new mon\n", monc->cur_mon, - ceph_pr_addr(&monc->con.peer_addr.in_addr)); - - __close_session(monc); - if (!monc->hunting) { - /* start hunting */ - monc->hunting = true; - __open_session(monc); - } else { - /* already hunting, let's wait a bit */ - __schedule_delayed(monc); + dout("%s mon%d\n", __func__, monc->cur_mon); + if (monc->cur_mon >= 0) { + if (!monc->hunting) { + dout("%s hunting for new mon\n", __func__); + reopen_session(monc); + __schedule_delayed(monc); + } else { + dout("%s already hunting\n", __func__); + } } -out: mutex_unlock(&monc->mutex); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f8f235930d88..32355d9d0103 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref) ceph_put_snap_context(req->r_snapc); if (req->r_mempool) mempool_free(req, req->r_osdc->req_mempool); - else + else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) kmem_cache_free(ceph_osd_request_cache, req); - + else + kfree(req); } void ceph_osdc_get_request(struct ceph_osd_request *req) @@ -369,28 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_msg *msg; size_t msg_size; - BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX); - BUG_ON(num_ops > CEPH_OSD_MAX_OP); - - msg_size = 4 + 4 + 8 + 8 + 4+8; - msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ - msg_size += 1 + 8 + 4 + 4; /* pg_t */ - msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ - msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); - msg_size += 8; /* snapid */ - msg_size += 8; /* snap_seq */ - msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ - msg_size += 4; - if (use_mempool) { + BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); req = mempool_alloc(osdc->req_mempool, gfp_flags); - memset(req, 0, sizeof(*req)); + } else if (num_ops <= CEPH_OSD_SLAB_OPS) { + req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); } else { - req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags); + BUG_ON(num_ops > CEPH_OSD_MAX_OPS); + req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]), + gfp_flags); } - if (req == NULL) + if (unlikely(!req)) return NULL; + /* req only, each op is zeroed in _osd_req_op_init() */ + memset(req, 0, sizeof(*req)); + req->r_osdc = osdc; req->r_mempool = use_mempool; req->r_num_ops = num_ops; @@ -408,18 +403,36 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, req->r_base_oloc.pool = -1; req->r_target_oloc.pool = -1; + msg_size = OSD_OPREPLY_FRONT_LEN; + if (num_ops > CEPH_OSD_SLAB_OPS) { + /* ceph_osd_op and rval */ + msg_size += (num_ops - CEPH_OSD_SLAB_OPS) * + (sizeof(struct ceph_osd_op) + 4); + } + /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); else - msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, - OSD_OPREPLY_FRONT_LEN, gfp_flags, true); + msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, + gfp_flags, true); if (!msg) { ceph_osdc_put_request(req); return NULL; } req->r_reply = msg; + msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ + msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ + msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ + msg_size += 1 + 8 + 4 + 4; /* pgid */ + msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ + msg_size += 2 + num_ops * sizeof(struct ceph_osd_op); + msg_size += 8; /* snapid */ + msg_size += 8; /* snap_seq */ + msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ + msg_size += 4; /* retry_attempt */ + /* create request message; allow space for oid */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); @@ -498,7 +511,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) payload_len += length; - op->payload_len = payload_len; + op->indata_len = payload_len; } EXPORT_SYMBOL(osd_req_op_extent_init); @@ -517,10 +530,32 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req, BUG_ON(length > previous); op->extent.length = length; - op->payload_len -= previous - length; + op->indata_len -= previous - length; } EXPORT_SYMBOL(osd_req_op_extent_update); +void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, + unsigned int which, u64 offset_inc) +{ + struct ceph_osd_req_op *op, *prev_op; + + BUG_ON(which + 1 >= osd_req->r_num_ops); + + prev_op = &osd_req->r_ops[which]; + op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); + /* dup previous one */ + op->indata_len = prev_op->indata_len; + op->outdata_len = prev_op->outdata_len; + op->extent = prev_op->extent; + /* adjust offset */ + op->extent.offset += offset_inc; + op->extent.length -= offset_inc; + + if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) + op->indata_len -= offset_inc; +} +EXPORT_SYMBOL(osd_req_op_extent_dup_last); + void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method) { @@ -554,7 +589,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, op->cls.argc = 0; /* currently unused */ - op->payload_len = payload_len; + op->indata_len = payload_len; } EXPORT_SYMBOL(osd_req_op_cls_init); @@ -587,7 +622,7 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, op->xattr.cmp_mode = cmp_mode; ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); - op->payload_len = payload_len; + op->indata_len = payload_len; return 0; } EXPORT_SYMBOL(osd_req_op_xattr_init); @@ -707,7 +742,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); dst->cls.indata_len = cpu_to_le32(data_length); ceph_osdc_msg_data_add(req->r_request, osd_data); - src->payload_len += data_length; + src->indata_len += data_length; request_data_len += data_length; } osd_data = &src->cls.response_data; @@ -750,7 +785,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->op = cpu_to_le16(src->op); dst->flags = cpu_to_le32(src->flags); - dst->payload_len = cpu_to_le32(src->payload_len); + dst->payload_len = cpu_to_le32(src->indata_len); return request_data_len; } @@ -1770,6 +1805,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) u32 osdmap_epoch; int already_completed; u32 bytes; + u8 decode_redir; unsigned int i; tid = le64_to_cpu(msg->hdr.tid); @@ -1809,7 +1845,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_decode_need(&p, end, 4, bad_put); numops = ceph_decode_32(&p); - if (numops > CEPH_OSD_MAX_OP) + if (numops > CEPH_OSD_MAX_OPS) goto bad_put; if (numops != req->r_num_ops) goto bad_put; @@ -1820,7 +1856,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) int len; len = le32_to_cpu(op->payload_len); - req->r_reply_op_len[i] = len; + req->r_ops[i].outdata_len = len; dout(" op %d has %d bytes\n", i, len); payload_len += len; p += sizeof(*op); @@ -1835,12 +1871,21 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_decode_need(&p, end, 4 + numops * 4, bad_put); retry_attempt = ceph_decode_32(&p); for (i = 0; i < numops; i++) - req->r_reply_op_result[i] = ceph_decode_32(&p); + req->r_ops[i].rval = ceph_decode_32(&p); if (le16_to_cpu(msg->hdr.version) >= 6) { p += 8 + 4; /* skip replay_version */ p += 8; /* skip user_version */ + if (le16_to_cpu(msg->hdr.version) >= 7) + ceph_decode_8_safe(&p, end, decode_redir, bad_put); + else + decode_redir = 1; + } else { + decode_redir = 0; + } + + if (decode_redir) { err = ceph_redirect_decode(&p, end, &redir); if (err) goto bad_put; @@ -2177,7 +2222,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) goto bad; done: downgrade_write(&osdc->map_sem); - ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); + ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, + osdc->osdmap->epoch); /* * subscribe to subsequent osdmap updates if full to ensure @@ -2636,8 +2682,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) round_jiffies_relative(osdc->client->options->osd_idle_ttl)); err = -ENOMEM; - osdc->req_mempool = mempool_create_kmalloc_pool(10, - sizeof(struct ceph_osd_request)); + osdc->req_mempool = mempool_create_slab_pool(10, + ceph_osd_request_cache); if (!osdc->req_mempool) goto out; @@ -2772,11 +2818,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages); int ceph_osdc_setup(void) { + size_t size = sizeof(struct ceph_osd_request) + + CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); + BUG_ON(ceph_osd_request_cache); - ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", - sizeof (struct ceph_osd_request), - __alignof__(struct ceph_osd_request), - 0, NULL); + ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, + 0, 0, NULL); return ceph_osd_request_cache ? 0 : -ENOMEM; } @@ -2843,8 +2890,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); if (!req) { - pr_warn("%s osd%d tid %llu unknown, skipping\n", - __func__, osd->o_osd, tid); + dout("%s osd%d tid %llu unknown, skipping\n", __func__, + osd->o_osd, tid); m = NULL; *skip = 1; goto out; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 7d8f581d9f1f..243574c8cf33 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -342,23 +342,32 @@ static struct crush_map *crush_decode(void *pbyval, void *end) c->choose_local_tries = ceph_decode_32(p); c->choose_local_fallback_tries = ceph_decode_32(p); c->choose_total_tries = ceph_decode_32(p); - dout("crush decode tunable choose_local_tries = %d", + dout("crush decode tunable choose_local_tries = %d\n", c->choose_local_tries); - dout("crush decode tunable choose_local_fallback_tries = %d", + dout("crush decode tunable choose_local_fallback_tries = %d\n", c->choose_local_fallback_tries); - dout("crush decode tunable choose_total_tries = %d", + dout("crush decode tunable choose_total_tries = %d\n", c->choose_total_tries); ceph_decode_need(p, end, sizeof(u32), done); c->chooseleaf_descend_once = ceph_decode_32(p); - dout("crush decode tunable chooseleaf_descend_once = %d", + dout("crush decode tunable chooseleaf_descend_once = %d\n", c->chooseleaf_descend_once); ceph_decode_need(p, end, sizeof(u8), done); c->chooseleaf_vary_r = ceph_decode_8(p); - dout("crush decode tunable chooseleaf_vary_r = %d", + dout("crush decode tunable chooseleaf_vary_r = %d\n", c->chooseleaf_vary_r); + /* skip straw_calc_version, allowed_bucket_algs */ + ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done); + *p += sizeof(u8) + sizeof(u32); + + ceph_decode_need(p, end, sizeof(u8), done); + c->chooseleaf_stable = ceph_decode_8(p); + dout("crush decode tunable chooseleaf_stable = %d\n", + c->chooseleaf_stable); + done: dout("crush_decode success\n"); return c; diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index d4f5f220a8e5..10297f7a89ba 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -24,7 +24,7 @@ struct page **ceph_get_direct_page_vector(const void __user *data, return ERR_PTR(-ENOMEM); while (got < num_pages) { - rc = get_user_pages_unlocked(current, current->mm, + rc = get_user_pages_unlocked( (unsigned long)data + ((unsigned long)got * PAGE_SIZE), num_pages - got, write_page, 0, pages + got); if (rc < 0) |