diff options
Diffstat (limited to 'drivers/block')
-rw-r--r-- | drivers/block/DAC960.c | 4 | ||||
-rw-r--r-- | drivers/block/aoe/aoecmd.c | 41 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_main.c | 2 | ||||
-rw-r--r-- | drivers/block/loop.c | 9 | ||||
-rw-r--r-- | drivers/block/mtip32xx/mtip32xx.c | 3 | ||||
-rw-r--r-- | drivers/block/nbd.c | 412 | ||||
-rw-r--r-- | drivers/block/null_blk.c | 129 | ||||
-rw-r--r-- | drivers/block/rbd.c | 1459 | ||||
-rw-r--r-- | drivers/block/rbd_types.h | 11 | ||||
-rw-r--r-- | drivers/block/virtio_blk.c | 11 | ||||
-rw-r--r-- | drivers/block/xen-blkfront.c | 1 |
11 files changed, 1453 insertions, 629 deletions
diff --git a/drivers/block/DAC960.c b/drivers/block/DAC960.c index 811e11c82f32..0809cda93cc0 100644 --- a/drivers/block/DAC960.c +++ b/drivers/block/DAC960.c @@ -2954,7 +2954,7 @@ DAC960_DetectController(struct pci_dev *PCI_Device, case DAC960_PD_Controller: if (!request_region(Controller->IO_Address, 0x80, Controller->FullModelName)) { - DAC960_Error("IO port 0x%d busy for Controller at\n", + DAC960_Error("IO port 0x%lx busy for Controller at\n", Controller, Controller->IO_Address); goto Failure; } @@ -2990,7 +2990,7 @@ DAC960_DetectController(struct pci_dev *PCI_Device, case DAC960_P_Controller: if (!request_region(Controller->IO_Address, 0x80, Controller->FullModelName)){ - DAC960_Error("IO port 0x%d busy for Controller at\n", + DAC960_Error("IO port 0x%lx busy for Controller at\n", Controller, Controller->IO_Address); goto Failure; } diff --git a/drivers/block/aoe/aoecmd.c b/drivers/block/aoe/aoecmd.c index ab19adb07a12..3c606c09fd5a 100644 --- a/drivers/block/aoe/aoecmd.c +++ b/drivers/block/aoe/aoecmd.c @@ -853,45 +853,6 @@ rqbiocnt(struct request *r) return n; } -/* This can be removed if we are certain that no users of the block - * layer will ever use zero-count pages in bios. Otherwise we have to - * protect against the put_page sometimes done by the network layer. - * - * See http://oss.sgi.com/archives/xfs/2007-01/msg00594.html for - * discussion. - * - * We cannot use get_page in the workaround, because it insists on a - * positive page count as a precondition. So we use _refcount directly. - */ -static void -bio_pageinc(struct bio *bio) -{ - struct bio_vec bv; - struct page *page; - struct bvec_iter iter; - - bio_for_each_segment(bv, bio, iter) { - /* Non-zero page count for non-head members of - * compound pages is no longer allowed by the kernel. - */ - page = compound_head(bv.bv_page); - page_ref_inc(page); - } -} - -static void -bio_pagedec(struct bio *bio) -{ - struct page *page; - struct bio_vec bv; - struct bvec_iter iter; - - bio_for_each_segment(bv, bio, iter) { - page = compound_head(bv.bv_page); - page_ref_dec(page); - } -} - static void bufinit(struct buf *buf, struct request *rq, struct bio *bio) { @@ -899,7 +860,6 @@ bufinit(struct buf *buf, struct request *rq, struct bio *bio) buf->rq = rq; buf->bio = bio; buf->iter = bio->bi_iter; - bio_pageinc(bio); } static struct buf * @@ -1127,7 +1087,6 @@ aoe_end_buf(struct aoedev *d, struct buf *buf) if (buf == d->ip.buf) d->ip.buf = NULL; rq = buf->rq; - bio_pagedec(buf->bio); mempool_free(buf, d->bufpool); n = (unsigned long) rq->special; rq->special = (void *) --n; diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index 100be556e613..83482721bc01 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -1871,7 +1871,7 @@ int drbd_send(struct drbd_connection *connection, struct socket *sock, drbd_update_congested(connection); } do { - rv = kernel_sendmsg(sock, &msg, &iov, 1, size); + rv = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); if (rv == -EAGAIN) { if (we_should_drop_the_connection(connection, sock)) break; diff --git a/drivers/block/loop.c b/drivers/block/loop.c index c9f2107f7095..fa1b7a90ba11 100644 --- a/drivers/block/loop.c +++ b/drivers/block/loop.c @@ -840,13 +840,13 @@ static void loop_config_discard(struct loop_device *lo) static void loop_unprepare_queue(struct loop_device *lo) { - flush_kthread_worker(&lo->worker); + kthread_flush_worker(&lo->worker); kthread_stop(lo->worker_task); } static int loop_prepare_queue(struct loop_device *lo) { - init_kthread_worker(&lo->worker); + kthread_init_worker(&lo->worker); lo->worker_task = kthread_run(kthread_worker_fn, &lo->worker, "loop%d", lo->lo_number); if (IS_ERR(lo->worker_task)) @@ -1658,7 +1658,7 @@ static int loop_queue_rq(struct blk_mq_hw_ctx *hctx, break; } - queue_kthread_work(&lo->worker, &cmd->work); + kthread_queue_work(&lo->worker, &cmd->work); return BLK_MQ_RQ_QUEUE_OK; } @@ -1696,14 +1696,13 @@ static int loop_init_request(void *data, struct request *rq, struct loop_cmd *cmd = blk_mq_rq_to_pdu(rq); cmd->rq = rq; - init_kthread_work(&cmd->work, loop_queue_work); + kthread_init_work(&cmd->work, loop_queue_work); return 0; } static struct blk_mq_ops loop_mq_ops = { .queue_rq = loop_queue_rq, - .map_queue = blk_mq_map_queue, .init_request = loop_init_request, }; diff --git a/drivers/block/mtip32xx/mtip32xx.c b/drivers/block/mtip32xx/mtip32xx.c index 2aca98e8e427..3cfd879267b2 100644 --- a/drivers/block/mtip32xx/mtip32xx.c +++ b/drivers/block/mtip32xx/mtip32xx.c @@ -3686,7 +3686,7 @@ static int mtip_block_open(struct block_device *dev, fmode_t mode) return -ENODEV; } -void mtip_block_release(struct gendisk *disk, fmode_t mode) +static void mtip_block_release(struct gendisk *disk, fmode_t mode) { } @@ -3895,7 +3895,6 @@ exit_handler: static struct blk_mq_ops mtip_mq_ops = { .queue_rq = mtip_queue_rq, - .map_queue = blk_mq_map_queue, .init_request = mtip_init_cmd, .exit_request = mtip_free_cmd, .complete = mtip_softirq_done_fn, diff --git a/drivers/block/nbd.c b/drivers/block/nbd.c index a9e398019f38..7a1048755914 100644 --- a/drivers/block/nbd.c +++ b/drivers/block/nbd.c @@ -34,33 +34,29 @@ #include <linux/kthread.h> #include <linux/types.h> #include <linux/debugfs.h> +#include <linux/blk-mq.h> #include <asm/uaccess.h> #include <asm/types.h> #include <linux/nbd.h> +#define NBD_TIMEDOUT 0 +#define NBD_DISCONNECT_REQUESTED 1 + struct nbd_device { u32 flags; + unsigned long runtime_flags; struct socket * sock; /* If == NULL, device is not ready, yet */ int magic; - spinlock_t queue_lock; - struct list_head queue_head; /* Requests waiting result */ - struct request *active_req; - wait_queue_head_t active_wq; - struct list_head waiting_queue; /* Requests to be sent */ - wait_queue_head_t waiting_wq; + struct blk_mq_tag_set tag_set; struct mutex tx_lock; struct gendisk *disk; int blksize; loff_t bytesize; - int xmit_timeout; - bool timedout; - bool disconnect; /* a disconnect has been requested by user */ - struct timer_list timeout_timer; /* protects initialization and shutdown of the socket */ spinlock_t sock_lock; struct task_struct *task_recv; @@ -71,6 +67,11 @@ struct nbd_device { #endif }; +struct nbd_cmd { + struct nbd_device *nbd; + struct list_head list; +}; + #if IS_ENABLED(CONFIG_DEBUG_FS) static struct dentry *nbd_dbg_dir; #endif @@ -83,18 +84,6 @@ static unsigned int nbds_max = 16; static struct nbd_device *nbd_dev; static int max_part; -/* - * Use just one lock (or at most 1 per NIC). Two arguments for this: - * 1. Each NIC is essentially a synchronization point for all servers - * accessed through that NIC so there's no need to have more locks - * than NICs anyway. - * 2. More locks lead to more "Dirty cache line bouncing" which will slow - * down each lock to the point where they're actually slower than just - * a single lock. - * Thanks go to Jens Axboe and Al Viro for their LKML emails explaining this! - */ -static DEFINE_SPINLOCK(nbd_lock); - static inline struct device *nbd_to_dev(struct nbd_device *nbd) { return disk_to_dev(nbd->disk); @@ -153,18 +142,16 @@ static int nbd_size_set(struct nbd_device *nbd, struct block_device *bdev, return 0; } -static void nbd_end_request(struct nbd_device *nbd, struct request *req) +static void nbd_end_request(struct nbd_cmd *cmd) { + struct nbd_device *nbd = cmd->nbd; + struct request *req = blk_mq_rq_from_pdu(cmd); int error = req->errors ? -EIO : 0; - struct request_queue *q = req->q; - unsigned long flags; - dev_dbg(nbd_to_dev(nbd), "request %p: %s\n", req, + dev_dbg(nbd_to_dev(nbd), "request %p: %s\n", cmd, error ? "failed" : "done"); - spin_lock_irqsave(q->queue_lock, flags); - __blk_end_request_all(req, error); - spin_unlock_irqrestore(q->queue_lock, flags); + blk_mq_complete_request(req, error); } /* @@ -172,40 +159,49 @@ static void nbd_end_request(struct nbd_device *nbd, struct request *req) */ static void sock_shutdown(struct nbd_device *nbd) { - spin_lock_irq(&nbd->sock_lock); + struct socket *sock; + + spin_lock(&nbd->sock_lock); if (!nbd->sock) { - spin_unlock_irq(&nbd->sock_lock); + spin_unlock(&nbd->sock_lock); return; } + sock = nbd->sock; dev_warn(disk_to_dev(nbd->disk), "shutting down socket\n"); - kernel_sock_shutdown(nbd->sock, SHUT_RDWR); - sockfd_put(nbd->sock); nbd->sock = NULL; - spin_unlock_irq(&nbd->sock_lock); + spin_unlock(&nbd->sock_lock); - del_timer(&nbd->timeout_timer); + kernel_sock_shutdown(sock, SHUT_RDWR); + sockfd_put(sock); } -static void nbd_xmit_timeout(unsigned long arg) +static enum blk_eh_timer_return nbd_xmit_timeout(struct request *req, + bool reserved) { - struct nbd_device *nbd = (struct nbd_device *)arg; - unsigned long flags; - - if (list_empty(&nbd->queue_head)) - return; + struct nbd_cmd *cmd = blk_mq_rq_to_pdu(req); + struct nbd_device *nbd = cmd->nbd; + struct socket *sock = NULL; - spin_lock_irqsave(&nbd->sock_lock, flags); + spin_lock(&nbd->sock_lock); - nbd->timedout = true; + set_bit(NBD_TIMEDOUT, &nbd->runtime_flags); - if (nbd->sock) - kernel_sock_shutdown(nbd->sock, SHUT_RDWR); + if (nbd->sock) { + sock = nbd->sock; + get_file(sock->file); + } - spin_unlock_irqrestore(&nbd->sock_lock, flags); + spin_unlock(&nbd->sock_lock); + if (sock) { + kernel_sock_shutdown(sock, SHUT_RDWR); + sockfd_put(sock); + } + req->errors++; dev_err(nbd_to_dev(nbd), "Connection timed out, shutting down connection\n"); + return BLK_EH_HANDLED; } /* @@ -255,9 +251,6 @@ static int sock_xmit(struct nbd_device *nbd, int send, void *buf, int size, tsk_restore_flags(current, pflags, PF_MEMALLOC); - if (!send && nbd->xmit_timeout) - mod_timer(&nbd->timeout_timer, jiffies + nbd->xmit_timeout); - return result; } @@ -273,8 +266,9 @@ static inline int sock_send_bvec(struct nbd_device *nbd, struct bio_vec *bvec, } /* always call with the tx_lock held */ -static int nbd_send_req(struct nbd_device *nbd, struct request *req) +static int nbd_send_cmd(struct nbd_device *nbd, struct nbd_cmd *cmd) { + struct request *req = blk_mq_rq_from_pdu(cmd); int result, flags; struct nbd_request request; unsigned long size = blk_rq_bytes(req); @@ -298,10 +292,10 @@ static int nbd_send_req(struct nbd_device *nbd, struct request *req) request.from = cpu_to_be64((u64)blk_rq_pos(req) << 9); request.len = htonl(size); } - memcpy(request.handle, &req, sizeof(req)); + memcpy(request.handle, &req->tag, sizeof(req->tag)); dev_dbg(nbd_to_dev(nbd), "request %p: sending control (%s@%llu,%uB)\n", - req, nbdcmd_to_ascii(type), + cmd, nbdcmd_to_ascii(type), (unsigned long long)blk_rq_pos(req) << 9, blk_rq_bytes(req)); result = sock_xmit(nbd, 1, &request, sizeof(request), (type == NBD_CMD_WRITE) ? MSG_MORE : 0); @@ -323,7 +317,7 @@ static int nbd_send_req(struct nbd_device *nbd, struct request *req) if (!rq_iter_last(bvec, iter)) flags = MSG_MORE; dev_dbg(nbd_to_dev(nbd), "request %p: sending %d bytes data\n", - req, bvec.bv_len); + cmd, bvec.bv_len); result = sock_send_bvec(nbd, &bvec, flags); if (result <= 0) { dev_err(disk_to_dev(nbd->disk), @@ -336,29 +330,6 @@ static int nbd_send_req(struct nbd_device *nbd, struct request *req) return 0; } -static struct request *nbd_find_request(struct nbd_device *nbd, - struct request *xreq) -{ - struct request *req, *tmp; - int err; - - err = wait_event_interruptible(nbd->active_wq, nbd->active_req != xreq); - if (unlikely(err)) - return ERR_PTR(err); - - spin_lock(&nbd->queue_lock); - list_for_each_entry_safe(req, tmp, &nbd->queue_head, queuelist) { - if (req != xreq) - continue; - list_del_init(&req->queuelist); - spin_unlock(&nbd->queue_lock); - return req; - } - spin_unlock(&nbd->queue_lock); - - return ERR_PTR(-ENOENT); -} - static inline int sock_recv_bvec(struct nbd_device *nbd, struct bio_vec *bvec) { int result; @@ -370,11 +341,14 @@ static inline int sock_recv_bvec(struct nbd_device *nbd, struct bio_vec *bvec) } /* NULL returned = something went wrong, inform userspace */ -static struct request *nbd_read_stat(struct nbd_device *nbd) +static struct nbd_cmd *nbd_read_stat(struct nbd_device *nbd) { int result; struct nbd_reply reply; - struct request *req; + struct nbd_cmd *cmd; + struct request *req = NULL; + u16 hwq; + int tag; reply.magic = 0; result = sock_xmit(nbd, 0, &reply, sizeof(reply), MSG_WAITALL); @@ -390,25 +364,27 @@ static struct request *nbd_read_stat(struct nbd_device *nbd) return ERR_PTR(-EPROTO); } - req = nbd_find_request(nbd, *(struct request **)reply.handle); - if (IS_ERR(req)) { - result = PTR_ERR(req); - if (result != -ENOENT) - return ERR_PTR(result); + memcpy(&tag, reply.handle, sizeof(int)); - dev_err(disk_to_dev(nbd->disk), "Unexpected reply (%p)\n", - reply.handle); - return ERR_PTR(-EBADR); + hwq = blk_mq_unique_tag_to_hwq(tag); + if (hwq < nbd->tag_set.nr_hw_queues) + req = blk_mq_tag_to_rq(nbd->tag_set.tags[hwq], + blk_mq_unique_tag_to_tag(tag)); + if (!req || !blk_mq_request_started(req)) { + dev_err(disk_to_dev(nbd->disk), "Unexpected reply (%d) %p\n", + tag, req); + return ERR_PTR(-ENOENT); } + cmd = blk_mq_rq_to_pdu(req); if (ntohl(reply.error)) { dev_err(disk_to_dev(nbd->disk), "Other side returned error (%d)\n", ntohl(reply.error)); req->errors++; - return req; + return cmd; } - dev_dbg(nbd_to_dev(nbd), "request %p: got reply\n", req); + dev_dbg(nbd_to_dev(nbd), "request %p: got reply\n", cmd); if (rq_data_dir(req) != WRITE) { struct req_iterator iter; struct bio_vec bvec; @@ -419,13 +395,13 @@ static struct request *nbd_read_stat(struct nbd_device *nbd) dev_err(disk_to_dev(nbd->disk), "Receive data failed (result %d)\n", result); req->errors++; - return req; + return cmd; } dev_dbg(nbd_to_dev(nbd), "request %p: got %d bytes data\n", - req, bvec.bv_len); + cmd, bvec.bv_len); } } - return req; + return cmd; } static ssize_t pid_show(struct device *dev, @@ -444,7 +420,7 @@ static struct device_attribute pid_attr = { static int nbd_thread_recv(struct nbd_device *nbd, struct block_device *bdev) { - struct request *req; + struct nbd_cmd *cmd; int ret; BUG_ON(nbd->magic != NBD_MAGIC); @@ -460,13 +436,13 @@ static int nbd_thread_recv(struct nbd_device *nbd, struct block_device *bdev) nbd_size_update(nbd, bdev); while (1) { - req = nbd_read_stat(nbd); - if (IS_ERR(req)) { - ret = PTR_ERR(req); + cmd = nbd_read_stat(nbd); + if (IS_ERR(cmd)) { + ret = PTR_ERR(cmd); break; } - nbd_end_request(nbd, req); + nbd_end_request(cmd); } nbd_size_clear(nbd, bdev); @@ -475,44 +451,37 @@ static int nbd_thread_recv(struct nbd_device *nbd, struct block_device *bdev) return ret; } -static void nbd_clear_que(struct nbd_device *nbd) +static void nbd_clear_req(struct request *req, void *data, bool reserved) { - struct request *req; + struct nbd_cmd *cmd; + + if (!blk_mq_request_started(req)) + return; + cmd = blk_mq_rq_to_pdu(req); + req->errors++; + nbd_end_request(cmd); +} +static void nbd_clear_que(struct nbd_device *nbd) +{ BUG_ON(nbd->magic != NBD_MAGIC); /* * Because we have set nbd->sock to NULL under the tx_lock, all - * modifications to the list must have completed by now. For - * the same reason, the active_req must be NULL. - * - * As a consequence, we don't need to take the spin lock while - * purging the list here. + * modifications to the list must have completed by now. */ BUG_ON(nbd->sock); - BUG_ON(nbd->active_req); - while (!list_empty(&nbd->queue_head)) { - req = list_entry(nbd->queue_head.next, struct request, - queuelist); - list_del_init(&req->queuelist); - req->errors++; - nbd_end_request(nbd, req); - } - - while (!list_empty(&nbd->waiting_queue)) { - req = list_entry(nbd->waiting_queue.next, struct request, - queuelist); - list_del_init(&req->queuelist); - req->errors++; - nbd_end_request(nbd, req); - } + blk_mq_tagset_busy_iter(&nbd->tag_set, nbd_clear_req, NULL); dev_dbg(disk_to_dev(nbd->disk), "queue cleared\n"); } -static void nbd_handle_req(struct nbd_device *nbd, struct request *req) +static void nbd_handle_cmd(struct nbd_cmd *cmd) { + struct request *req = blk_mq_rq_from_pdu(cmd); + struct nbd_device *nbd = cmd->nbd; + if (req->cmd_type != REQ_TYPE_FS) goto error_out; @@ -526,6 +495,7 @@ static void nbd_handle_req(struct nbd_device *nbd, struct request *req) req->errors = 0; mutex_lock(&nbd->tx_lock); + nbd->task_send = current; if (unlikely(!nbd->sock)) { mutex_unlock(&nbd->tx_lock); dev_err(disk_to_dev(nbd->disk), @@ -533,106 +503,30 @@ static void nbd_handle_req(struct nbd_device *nbd, struct request *req) goto error_out; } - nbd->active_req = req; - - if (nbd->xmit_timeout && list_empty_careful(&nbd->queue_head)) - mod_timer(&nbd->timeout_timer, jiffies + nbd->xmit_timeout); - - if (nbd_send_req(nbd, req) != 0) { + if (nbd_send_cmd(nbd, cmd) != 0) { dev_err(disk_to_dev(nbd->disk), "Request send failed\n"); req->errors++; - nbd_end_request(nbd, req); - } else { - spin_lock(&nbd->queue_lock); - list_add_tail(&req->queuelist, &nbd->queue_head); - spin_unlock(&nbd->queue_lock); + nbd_end_request(cmd); } - nbd->active_req = NULL; + nbd->task_send = NULL; mutex_unlock(&nbd->tx_lock); - wake_up_all(&nbd->active_wq); return; error_out: req->errors++; - nbd_end_request(nbd, req); + nbd_end_request(cmd); } -static int nbd_thread_send(void *data) +static int nbd_queue_rq(struct blk_mq_hw_ctx *hctx, + const struct blk_mq_queue_data *bd) { - struct nbd_device *nbd = data; - struct request *req; - - nbd->task_send = current; - - set_user_nice(current, MIN_NICE); - while (!kthread_should_stop() || !list_empty(&nbd->waiting_queue)) { - /* wait for something to do */ - wait_event_interruptible(nbd->waiting_wq, - kthread_should_stop() || - !list_empty(&nbd->waiting_queue)); - - /* extract request */ - if (list_empty(&nbd->waiting_queue)) - continue; - - spin_lock_irq(&nbd->queue_lock); - req = list_entry(nbd->waiting_queue.next, struct request, - queuelist); - list_del_init(&req->queuelist); - spin_unlock_irq(&nbd->queue_lock); - - /* handle request */ - nbd_handle_req(nbd, req); - } + struct nbd_cmd *cmd = blk_mq_rq_to_pdu(bd->rq); - nbd->task_send = NULL; - - return 0; -} - -/* - * We always wait for result of write, for now. It would be nice to make it optional - * in future - * if ((rq_data_dir(req) == WRITE) && (nbd->flags & NBD_WRITE_NOCHK)) - * { printk( "Warning: Ignoring result!\n"); nbd_end_request( req ); } - */ - -static void nbd_request_handler(struct request_queue *q) - __releases(q->queue_lock) __acquires(q->queue_lock) -{ - struct request *req; - - while ((req = blk_fetch_request(q)) != NULL) { - struct nbd_device *nbd; - - spin_unlock_irq(q->queue_lock); - - nbd = req->rq_disk->private_data; - - BUG_ON(nbd->magic != NBD_MAGIC); - - dev_dbg(nbd_to_dev(nbd), "request %p: dequeued (flags=%x)\n", - req, req->cmd_type); - - if (unlikely(!nbd->sock)) { - dev_err_ratelimited(disk_to_dev(nbd->disk), - "Attempted send on closed socket\n"); - req->errors++; - nbd_end_request(nbd, req); - spin_lock_irq(q->queue_lock); - continue; - } - - spin_lock_irq(&nbd->queue_lock); - list_add_tail(&req->queuelist, &nbd->waiting_queue); - spin_unlock_irq(&nbd->queue_lock); - - wake_up(&nbd->waiting_wq); - - spin_lock_irq(q->queue_lock); - } + blk_mq_start_request(bd->rq); + nbd_handle_cmd(cmd); + return BLK_MQ_RQ_QUEUE_OK; } static int nbd_set_socket(struct nbd_device *nbd, struct socket *sock) @@ -657,15 +551,13 @@ out: /* Reset all properties of an NBD device */ static void nbd_reset(struct nbd_device *nbd) { - nbd->disconnect = false; - nbd->timedout = false; + nbd->runtime_flags = 0; nbd->blksize = 1024; nbd->bytesize = 0; set_capacity(nbd->disk, 0); nbd->flags = 0; - nbd->xmit_timeout = 0; + nbd->tag_set.timeout = 0; queue_flag_clear_unlocked(QUEUE_FLAG_DISCARD, nbd->disk->queue); - del_timer_sync(&nbd->timeout_timer); } static void nbd_bdev_reset(struct block_device *bdev) @@ -700,33 +592,37 @@ static int __nbd_ioctl(struct block_device *bdev, struct nbd_device *nbd, { switch (cmd) { case NBD_DISCONNECT: { - struct request sreq; + struct request *sreq; dev_info(disk_to_dev(nbd->disk), "NBD_DISCONNECT\n"); if (!nbd->sock) return -EINVAL; + sreq = blk_mq_alloc_request(bdev_get_queue(bdev), WRITE, 0); + if (IS_ERR(sreq)) + return -ENOMEM; + mutex_unlock(&nbd->tx_lock); fsync_bdev(bdev); mutex_lock(&nbd->tx_lock); - blk_rq_init(NULL, &sreq); - sreq.cmd_type = REQ_TYPE_DRV_PRIV; + sreq->cmd_type = REQ_TYPE_DRV_PRIV; /* Check again after getting mutex back. */ - if (!nbd->sock) + if (!nbd->sock) { + blk_mq_free_request(sreq); return -EINVAL; + } - nbd->disconnect = true; + set_bit(NBD_DISCONNECT_REQUESTED, &nbd->runtime_flags); - nbd_send_req(nbd, &sreq); + nbd_send_cmd(nbd, blk_mq_rq_to_pdu(sreq)); + blk_mq_free_request(sreq); return 0; } case NBD_CLEAR_SOCK: sock_shutdown(nbd); nbd_clear_que(nbd); - BUG_ON(!list_empty(&nbd->queue_head)); - BUG_ON(!list_empty(&nbd->waiting_queue)); kill_bdev(bdev); return 0; @@ -758,13 +654,7 @@ static int __nbd_ioctl(struct block_device *bdev, struct nbd_device *nbd, return nbd_size_set(nbd, bdev, nbd->blksize, arg); case NBD_SET_TIMEOUT: - nbd->xmit_timeout = arg * HZ; - if (arg) - mod_timer(&nbd->timeout_timer, - jiffies + nbd->xmit_timeout); - else - del_timer_sync(&nbd->timeout_timer); - + nbd->tag_set.timeout = arg * HZ; return 0; case NBD_SET_FLAGS: @@ -772,7 +662,6 @@ static int __nbd_ioctl(struct block_device *bdev, struct nbd_device *nbd, return 0; case NBD_DO_IT: { - struct task_struct *thread; int error; if (nbd->task_recv) @@ -786,18 +675,9 @@ static int __nbd_ioctl(struct block_device *bdev, struct nbd_device *nbd, nbd_parse_flags(nbd, bdev); - thread = kthread_run(nbd_thread_send, nbd, "%s", - nbd_name(nbd)); - if (IS_ERR(thread)) { - mutex_lock(&nbd->tx_lock); - nbd->task_recv = NULL; - return PTR_ERR(thread); - } - nbd_dev_dbg_init(nbd); error = nbd_thread_recv(nbd, bdev); nbd_dev_dbg_close(nbd); - kthread_stop(thread); mutex_lock(&nbd->tx_lock); nbd->task_recv = NULL; @@ -807,9 +687,10 @@ static int __nbd_ioctl(struct block_device *bdev, struct nbd_device *nbd, kill_bdev(bdev); nbd_bdev_reset(bdev); - if (nbd->disconnect) /* user requested, ignore socket errors */ + /* user requested, ignore socket errors */ + if (test_bit(NBD_DISCONNECT_REQUESTED, &nbd->runtime_flags)) error = 0; - if (nbd->timedout) + if (test_bit(NBD_TIMEDOUT, &nbd->runtime_flags)) error = -ETIMEDOUT; nbd_reset(nbd); @@ -825,10 +706,10 @@ static int __nbd_ioctl(struct block_device *bdev, struct nbd_device *nbd, return 0; case NBD_PRINT_DEBUG: - dev_info(disk_to_dev(nbd->disk), - "next = %p, prev = %p, head = %p\n", - nbd->queue_head.next, nbd->queue_head.prev, - &nbd->queue_head); + /* + * For compatibility only, we no longer keep a list of + * outstanding requests. + */ return 0; } return -ENOTTY; @@ -935,7 +816,7 @@ static int nbd_dev_dbg_init(struct nbd_device *nbd) debugfs_create_file("tasks", 0444, dir, nbd, &nbd_dbg_tasks_ops); debugfs_create_u64("size_bytes", 0444, dir, &nbd->bytesize); - debugfs_create_u32("timeout", 0444, dir, &nbd->xmit_timeout); + debugfs_create_u32("timeout", 0444, dir, &nbd->tag_set.timeout); debugfs_create_u32("blocksize", 0444, dir, &nbd->blksize); debugfs_create_file("flags", 0444, dir, nbd, &nbd_dbg_flags_ops); @@ -987,6 +868,23 @@ static void nbd_dbg_close(void) #endif +static int nbd_init_request(void *data, struct request *rq, + unsigned int hctx_idx, unsigned int request_idx, + unsigned int numa_node) +{ + struct nbd_cmd *cmd = blk_mq_rq_to_pdu(rq); + + cmd->nbd = data; + INIT_LIST_HEAD(&cmd->list); + return 0; +} + +static struct blk_mq_ops nbd_mq_ops = { + .queue_rq = nbd_queue_rq, + .init_request = nbd_init_request, + .timeout = nbd_xmit_timeout, +}; + /* * And here should be modules and kernel interface * (Just smiley confuses emacs :-) @@ -1035,16 +933,34 @@ static int __init nbd_init(void) if (!disk) goto out; nbd_dev[i].disk = disk; + + nbd_dev[i].tag_set.ops = &nbd_mq_ops; + nbd_dev[i].tag_set.nr_hw_queues = 1; + nbd_dev[i].tag_set.queue_depth = 128; + nbd_dev[i].tag_set.numa_node = NUMA_NO_NODE; + nbd_dev[i].tag_set.cmd_size = sizeof(struct nbd_cmd); + nbd_dev[i].tag_set.flags = BLK_MQ_F_SHOULD_MERGE | + BLK_MQ_F_SG_MERGE | BLK_MQ_F_BLOCKING; + nbd_dev[i].tag_set.driver_data = &nbd_dev[i]; + + err = blk_mq_alloc_tag_set(&nbd_dev[i].tag_set); + if (err) { + put_disk(disk); + goto out; + } + /* * The new linux 2.5 block layer implementation requires * every gendisk to have its very own request_queue struct. * These structs are big so we dynamically allocate them. */ - disk->queue = blk_init_queue(nbd_request_handler, &nbd_lock); + disk->queue = blk_mq_init_queue(&nbd_dev[i].tag_set); if (!disk->queue) { + blk_mq_free_tag_set(&nbd_dev[i].tag_set); put_disk(disk); goto out; } + /* * Tell the block layer that we are not a rotational device */ @@ -1069,16 +985,8 @@ static int __init nbd_init(void) for (i = 0; i < nbds_max; i++) { struct gendisk *disk = nbd_dev[i].disk; nbd_dev[i].magic = NBD_MAGIC; - INIT_LIST_HEAD(&nbd_dev[i].waiting_queue); - spin_lock_init(&nbd_dev[i].queue_lock); spin_lock_init(&nbd_dev[i].sock_lock); - INIT_LIST_HEAD(&nbd_dev[i].queue_head); mutex_init(&nbd_dev[i].tx_lock); - init_timer(&nbd_dev[i].timeout_timer); - nbd_dev[i].timeout_timer.function = nbd_xmit_timeout; - nbd_dev[i].timeout_timer.data = (unsigned long)&nbd_dev[i]; - init_waitqueue_head(&nbd_dev[i].active_wq); - init_waitqueue_head(&nbd_dev[i].waiting_wq); disk->major = NBD_MAJOR; disk->first_minor = i << part_shift; disk->fops = &nbd_fops; @@ -1091,6 +999,7 @@ static int __init nbd_init(void) return 0; out: while (i--) { + blk_mq_free_tag_set(&nbd_dev[i].tag_set); blk_cleanup_queue(nbd_dev[i].disk->queue); put_disk(nbd_dev[i].disk); } @@ -1110,6 +1019,7 @@ static void __exit nbd_cleanup(void) if (disk) { del_gendisk(disk); blk_cleanup_queue(disk->queue); + blk_mq_free_tag_set(&nbd_dev[i].tag_set); put_disk(disk); } } diff --git a/drivers/block/null_blk.c b/drivers/block/null_blk.c index 75a7f88d6717..ba6f4a2e73db 100644 --- a/drivers/block/null_blk.c +++ b/drivers/block/null_blk.c @@ -34,6 +34,7 @@ struct nullb { unsigned int index; struct request_queue *q; struct gendisk *disk; + struct nvm_dev *ndev; struct blk_mq_tag_set tag_set; struct hrtimer timer; unsigned int queue_depth; @@ -393,7 +394,6 @@ static int null_init_hctx(struct blk_mq_hw_ctx *hctx, void *data, static struct blk_mq_ops null_mq_ops = { .queue_rq = null_queue_rq, - .map_queue = blk_mq_map_queue, .init_hctx = null_init_hctx, .complete = null_softirq_done_fn, }; @@ -414,23 +414,6 @@ static void cleanup_queues(struct nullb *nullb) kfree(nullb->queues); } -static void null_del_dev(struct nullb *nullb) -{ - list_del_init(&nullb->list); - - if (use_lightnvm) - nvm_unregister(nullb->disk_name); - else - del_gendisk(nullb->disk); - blk_cleanup_queue(nullb->q); - if (queue_mode == NULL_Q_MQ) - blk_mq_free_tag_set(&nullb->tag_set); - if (!use_lightnvm) - put_disk(nullb->disk); - cleanup_queues(nullb); - kfree(nullb); -} - #ifdef CONFIG_NVM static void null_lnvm_end_io(struct request *rq, int error) @@ -564,10 +547,58 @@ static struct nvm_dev_ops null_lnvm_dev_ops = { /* Simulate nvme protocol restriction */ .max_phys_sect = 64, }; + +static int null_nvm_register(struct nullb *nullb) +{ + struct nvm_dev *dev; + int rv; + + dev = nvm_alloc_dev(0); + if (!dev) + return -ENOMEM; + + dev->q = nullb->q; + memcpy(dev->name, nullb->disk_name, DISK_NAME_LEN); + dev->ops = &null_lnvm_dev_ops; + + rv = nvm_register(dev); + if (rv) { + kfree(dev); + return rv; + } + nullb->ndev = dev; + return 0; +} + +static void null_nvm_unregister(struct nullb *nullb) +{ + nvm_unregister(nullb->ndev); +} #else -static struct nvm_dev_ops null_lnvm_dev_ops; +static int null_nvm_register(struct nullb *nullb) +{ + return -EINVAL; +} +static void null_nvm_unregister(struct nullb *nullb) {} #endif /* CONFIG_NVM */ +static void null_del_dev(struct nullb *nullb) +{ + list_del_init(&nullb->list); + + if (use_lightnvm) + null_nvm_unregister(nullb); + else + del_gendisk(nullb->disk); + blk_cleanup_queue(nullb->q); + if (queue_mode == NULL_Q_MQ) + blk_mq_free_tag_set(&nullb->tag_set); + if (!use_lightnvm) + put_disk(nullb->disk); + cleanup_queues(nullb); + kfree(nullb); +} + static int null_open(struct block_device *bdev, fmode_t mode) { return 0; @@ -640,11 +671,32 @@ static int init_driver_queues(struct nullb *nullb) return 0; } -static int null_add_dev(void) +static int null_gendisk_register(struct nullb *nullb) { struct gendisk *disk; - struct nullb *nullb; sector_t size; + + disk = nullb->disk = alloc_disk_node(1, home_node); + if (!disk) + return -ENOMEM; + size = gb * 1024 * 1024 * 1024ULL; + set_capacity(disk, size >> 9); + + disk->flags |= GENHD_FL_EXT_DEVT | GENHD_FL_SUPPRESS_PARTITION_INFO; + disk->major = null_major; + disk->first_minor = nullb->index; + disk->fops = &null_fops; + disk->private_data = nullb; + disk->queue = nullb->q; + strncpy(disk->disk_name, nullb->disk_name, DISK_NAME_LEN); + + add_disk(disk); + return 0; +} + +static int null_add_dev(void) +{ + struct nullb *nullb; int rv; nullb = kzalloc_node(sizeof(*nullb), GFP_KERNEL, home_node); @@ -716,42 +768,19 @@ static int null_add_dev(void) sprintf(nullb->disk_name, "nullb%d", nullb->index); - if (use_lightnvm) { - rv = nvm_register(nullb->q, nullb->disk_name, - &null_lnvm_dev_ops); - if (rv) - goto out_cleanup_blk_queue; - goto done; - } - - disk = nullb->disk = alloc_disk_node(1, home_node); - if (!disk) { - rv = -ENOMEM; - goto out_cleanup_lightnvm; - } - size = gb * 1024 * 1024 * 1024ULL; - set_capacity(disk, size >> 9); - - disk->flags |= GENHD_FL_EXT_DEVT | GENHD_FL_SUPPRESS_PARTITION_INFO; - disk->major = null_major; - disk->first_minor = nullb->index; - disk->fops = &null_fops; - disk->private_data = nullb; - disk->queue = nullb->q; - strncpy(disk->disk_name, nullb->disk_name, DISK_NAME_LEN); + if (use_lightnvm) + rv = null_nvm_register(nullb); + else + rv = null_gendisk_register(nullb); - add_disk(disk); + if (rv) + goto out_cleanup_blk_queue; -done: mutex_lock(&lock); list_add_tail(&nullb->list, &nullb_list); mutex_unlock(&lock); return 0; - -out_cleanup_lightnvm: - if (use_lightnvm) - nvm_unregister(nullb->disk_name); out_cleanup_blk_queue: blk_cleanup_queue(nullb->q); out_cleanup_tags: diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6c6519f6492a..7b274ff4632c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -31,6 +31,7 @@ #include <linux/ceph/libceph.h> #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> +#include <linux/ceph/cls_lock_client.h> #include <linux/ceph/decode.h> #include <linux/parser.h> #include <linux/bsearch.h> @@ -114,12 +115,17 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_OBJ_PREFIX_LEN_MAX 64 +#define RBD_NOTIFY_TIMEOUT 5 /* seconds */ +#define RBD_RETRY_DELAY msecs_to_jiffies(1000) + /* Feature bits */ #define RBD_FEATURE_LAYERING (1<<0) #define RBD_FEATURE_STRIPINGV2 (1<<1) -#define RBD_FEATURES_ALL \ - (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) +#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2) +#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ + RBD_FEATURE_STRIPINGV2 | \ + RBD_FEATURE_EXCLUSIVE_LOCK) /* Features supported by this (client software) implementation. */ @@ -128,11 +134,8 @@ static int atomic_dec_return_safe(atomic_t *v) /* * An RBD device name will be "rbd#", where the "rbd" comes from * RBD_DRV_NAME above, and # is a unique integer identifier. - * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big - * enough to hold all possible device names. */ #define DEV_NAME_LEN 32 -#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) /* * block device image metadata (in-memory version) @@ -322,6 +325,24 @@ struct rbd_img_request { #define for_each_obj_request_safe(ireq, oreq, n) \ list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) +enum rbd_watch_state { + RBD_WATCH_STATE_UNREGISTERED, + RBD_WATCH_STATE_REGISTERED, + RBD_WATCH_STATE_ERROR, +}; + +enum rbd_lock_state { + RBD_LOCK_STATE_UNLOCKED, + RBD_LOCK_STATE_LOCKED, + RBD_LOCK_STATE_RELEASING, +}; + +/* WatchNotify::ClientId */ +struct rbd_client_id { + u64 gid; + u64 handle; +}; + struct rbd_mapping { u64 size; u64 features; @@ -349,13 +370,29 @@ struct rbd_device { unsigned long flags; /* possibly lock protected */ struct rbd_spec *spec; struct rbd_options *opts; + char *config_info; /* add{,_single_major} string */ struct ceph_object_id header_oid; struct ceph_object_locator header_oloc; - struct ceph_file_layout layout; + struct ceph_file_layout layout; /* used for all rbd requests */ + struct mutex watch_mutex; + enum rbd_watch_state watch_state; struct ceph_osd_linger_request *watch_handle; + u64 watch_cookie; + struct delayed_work watch_dwork; + + struct rw_semaphore lock_rwsem; + enum rbd_lock_state lock_state; + struct rbd_client_id owner_cid; + struct work_struct acquired_lock_work; + struct work_struct released_lock_work; + struct delayed_work lock_dwork; + struct work_struct unlock_work; + wait_queue_head_t lock_waitq; + + struct workqueue_struct *task_wq; struct rbd_spec *parent_spec; u64 parent_overlap; @@ -378,15 +415,15 @@ struct rbd_device { }; /* - * Flag bits for rbd_dev->flags. If atomicity is required, - * rbd_dev->lock is used to protect access. - * - * Currently, only the "removing" flag (which is coupled with the - * "open_count" field) requires atomic access. + * Flag bits for rbd_dev->flags: + * - REMOVING (which is coupled with rbd_dev->open_count) is protected + * by rbd_dev->lock + * - BLACKLISTED is protected by rbd_dev->lock_rwsem */ enum rbd_dev_flags { RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ + RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ }; static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ @@ -439,6 +476,29 @@ static int minor_to_rbd_dev_id(int minor) return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; } +static bool rbd_is_lock_supported(struct rbd_device *rbd_dev) +{ + return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && + rbd_dev->spec->snap_id == CEPH_NOSNAP && + !rbd_dev->mapping.read_only; +} + +static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) +{ + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || + rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; +} + +static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) +{ + bool is_lock_owner; + + down_read(&rbd_dev->lock_rwsem); + is_lock_owner = __rbd_is_lock_owner(rbd_dev); + up_read(&rbd_dev->lock_rwsem); + return is_lock_owner; +} + static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); @@ -735,6 +795,7 @@ enum { /* string args above */ Opt_read_only, Opt_read_write, + Opt_lock_on_read, Opt_err }; @@ -746,16 +807,19 @@ static match_table_t rbd_opts_tokens = { {Opt_read_only, "ro"}, /* Alternate spelling */ {Opt_read_write, "read_write"}, {Opt_read_write, "rw"}, /* Alternate spelling */ + {Opt_lock_on_read, "lock_on_read"}, {Opt_err, NULL} }; struct rbd_options { int queue_depth; bool read_only; + bool lock_on_read; }; #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ #define RBD_READ_ONLY_DEFAULT false +#define RBD_LOCK_ON_READ_DEFAULT false static int parse_rbd_opts_token(char *c, void *private) { @@ -791,6 +855,9 @@ static int parse_rbd_opts_token(char *c, void *private) case Opt_read_write: rbd_opts->read_only = false; break; + case Opt_lock_on_read: + rbd_opts->lock_on_read = true; + break; default: /* libceph prints "bad option" msg */ return -EINVAL; @@ -919,7 +986,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, char *snap_names = NULL; u64 *snap_sizes = NULL; u32 snap_count; - size_t size; int ret = -ENOMEM; u32 i; @@ -957,9 +1023,9 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, goto out_err; /* ...as well as the array of their sizes. */ - - size = snap_count * sizeof (*header->snap_sizes); - snap_sizes = kmalloc(size, GFP_KERNEL); + snap_sizes = kmalloc_array(snap_count, + sizeof(*header->snap_sizes), + GFP_KERNEL); if (!snap_sizes) goto out_err; @@ -1551,11 +1617,18 @@ static bool obj_request_type_valid(enum obj_request_type type) } } -static int rbd_obj_request_submit(struct ceph_osd_client *osdc, - struct rbd_obj_request *obj_request) +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request); + +static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { - dout("%s %p\n", __func__, obj_request); - return ceph_osdc_start_request(osdc, obj_request->osd_req, false); + struct ceph_osd_request *osd_req = obj_request->osd_req; + + dout("%s %p osd_req %p\n", __func__, obj_request, osd_req); + if (obj_request_img_data_test(obj_request)) { + WARN_ON(obj_request->callback != rbd_img_obj_callback); + rbd_img_request_get(obj_request->img_request); + } + ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } static void rbd_obj_request_end(struct rbd_obj_request *obj_request) @@ -1745,6 +1818,22 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) complete_all(&obj_request->completion); } +static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) +{ + obj_request->result = err; + obj_request->xferred = 0; + /* + * kludge - mirror rbd_obj_request_submit() to match a put in + * rbd_img_obj_callback() + */ + if (obj_request_img_data_test(obj_request)) { + WARN_ON(obj_request->callback != rbd_img_obj_callback); + rbd_img_request_get(obj_request->img_request); + } + obj_request_done_set(obj_request); + rbd_obj_request_complete(obj_request); +} + static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request = NULL; @@ -1877,11 +1966,10 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = obj_request->img_request; struct ceph_osd_request *osd_req = obj_request->osd_req; - if (img_request) - osd_req->r_snapid = img_request->snap_id; + rbd_assert(obj_request_img_data_test(obj_request)); + osd_req->r_snapid = obj_request->img_request->snap_id; } static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) @@ -2074,7 +2162,9 @@ static void rbd_obj_request_destroy(struct kref *kref) bio_chain_put(obj_request->bio_list); break; case OBJ_REQUEST_PAGES: - if (obj_request->pages) + /* img_data requests don't own their page array */ + if (obj_request->pages && + !obj_request_img_data_test(obj_request)) ceph_release_page_vector(obj_request->pages, obj_request->page_count); break; @@ -2295,13 +2385,6 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) xferred = obj_request->length; } - /* Image object requests don't own their page array */ - - if (obj_request->type == OBJ_REQUEST_PAGES) { - obj_request->pages = NULL; - obj_request->page_count = 0; - } - if (img_request_child_test(img_request)) { rbd_assert(img_request->obj_request != NULL); more = obj_request->which < img_request->obj_request_count - 1; @@ -2520,8 +2603,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); - rbd_img_request_get(img_request); - img_offset += length; resid -= length; } @@ -2579,7 +2660,6 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) { struct rbd_obj_request *orig_request; struct ceph_osd_request *osd_req; - struct ceph_osd_client *osdc; struct rbd_device *rbd_dev; struct page **pages; enum obj_operation_type op_type; @@ -2603,7 +2683,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) rbd_assert(obj_request_type_valid(orig_request->type)); img_result = img_request->result; parent_length = img_request->length; - rbd_assert(parent_length == img_request->xferred); + rbd_assert(img_result || parent_length == img_request->xferred); rbd_img_request_put(img_request); rbd_assert(orig_request->img_request); @@ -2616,13 +2696,9 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) * and re-submit the original write request. */ if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - ceph_release_page_vector(pages, page_count); - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, orig_request); - if (!img_result) - return; + rbd_obj_request_submit(orig_request); + return; } if (img_result) @@ -2656,17 +2732,12 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) /* All set, send it off. */ - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, orig_request); - if (!img_result) - return; -out_err: - /* Record the error code and complete the request */ + rbd_obj_request_submit(orig_request); + return; - orig_request->result = img_result; - orig_request->xferred = 0; - obj_request_done_set(orig_request); - rbd_obj_request_complete(orig_request); +out_err: + ceph_release_page_vector(pages, page_count); + rbd_obj_request_error(orig_request, img_result); } /* @@ -2680,26 +2751,19 @@ out_err: * When the read completes, this page array will be transferred to * the original object request for the copyup operation. * - * If an error occurs, record it as the result of the original - * object request and mark it done so it gets completed. + * If an error occurs, it is recorded as the result of the original + * object request in rbd_img_obj_exists_callback(). */ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = NULL; + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; struct rbd_img_request *parent_request = NULL; - struct rbd_device *rbd_dev; u64 img_offset; u64 length; struct page **pages = NULL; u32 page_count; int result; - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request_type_valid(obj_request->type)); - - img_request = obj_request->img_request; - rbd_assert(img_request != NULL); - rbd_dev = img_request->rbd_dev; rbd_assert(rbd_dev->parent != NULL); /* @@ -2740,10 +2804,11 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); if (result) goto out_err; + parent_request->copyup_pages = pages; parent_request->copyup_page_count = page_count; - parent_request->callback = rbd_img_obj_parent_read_full_callback; + result = rbd_img_request_submit(parent_request); if (!result) return 0; @@ -2757,10 +2822,6 @@ out_err: ceph_release_page_vector(pages, page_count); if (parent_request) rbd_img_request_put(parent_request); - obj_request->result = result; - obj_request->xferred = 0; - obj_request_done_set(obj_request); - return result; } @@ -2793,17 +2854,13 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) /* * If the overlap has become 0 (most likely because the - * image has been flattened) we need to free the pages - * and re-submit the original write request. + * image has been flattened) we need to re-submit the + * original request. */ rbd_dev = orig_request->img_request->rbd_dev; if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - - osdc = &rbd_dev->rbd_client->client->osdc; - result = rbd_obj_request_submit(osdc, orig_request); - if (!result) - return; + rbd_obj_request_submit(orig_request); + return; } /* @@ -2816,31 +2873,45 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) obj_request_existence_set(orig_request, true); } else if (result == -ENOENT) { obj_request_existence_set(orig_request, false); - } else if (result) { - orig_request->result = result; - goto out; + } else { + goto fail_orig_request; } /* * Resubmit the original request now that we have recorded * whether the target object exists. */ - orig_request->result = rbd_img_obj_request_submit(orig_request); -out: - if (orig_request->result) - rbd_obj_request_complete(orig_request); + result = rbd_img_obj_request_submit(orig_request); + if (result) + goto fail_orig_request; + + return; + +fail_orig_request: + rbd_obj_request_error(orig_request, result); } static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) { + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; struct rbd_obj_request *stat_request; - struct rbd_device *rbd_dev; - struct ceph_osd_client *osdc; - struct page **pages = NULL; + struct page **pages; u32 page_count; size_t size; int ret; + stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!stat_request) + return -ENOMEM; + + stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, + stat_request); + if (!stat_request->osd_req) { + ret = -ENOMEM; + goto fail_stat_request; + } + /* * The response data for a STAT call consists of: * le64 length; @@ -2852,52 +2923,33 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); page_count = (u32)calc_pages_for(0, size); pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto fail_stat_request; + } - ret = -ENOMEM; - stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, - OBJ_REQUEST_PAGES); - if (!stat_request) - goto out; + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, + false, false); rbd_obj_request_get(obj_request); stat_request->obj_request = obj_request; stat_request->pages = pages; stat_request->page_count = page_count; - - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; - stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - stat_request); - if (!stat_request->osd_req) - goto out; stat_request->callback = rbd_img_obj_exists_callback; - osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, - false, false); - rbd_osd_req_format_read(stat_request); - - osdc = &rbd_dev->rbd_client->client->osdc; - ret = rbd_obj_request_submit(osdc, stat_request); -out: - if (ret) - rbd_obj_request_put(obj_request); + rbd_obj_request_submit(stat_request); + return 0; +fail_stat_request: + rbd_obj_request_put(stat_request); return ret; } static bool img_obj_request_simple(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request; - struct rbd_device *rbd_dev; - - rbd_assert(obj_request_img_data_test(obj_request)); - - img_request = obj_request->img_request; - rbd_assert(img_request); - rbd_dev = img_request->rbd_dev; + struct rbd_img_request *img_request = obj_request->img_request; + struct rbd_device *rbd_dev = img_request->rbd_dev; /* Reads */ if (!img_request_write_test(img_request) && @@ -2936,14 +2988,13 @@ static bool img_obj_request_simple(struct rbd_obj_request *obj_request) static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) { - if (img_obj_request_simple(obj_request)) { - struct rbd_device *rbd_dev; - struct ceph_osd_client *osdc; - - rbd_dev = obj_request->img_request->rbd_dev; - osdc = &rbd_dev->rbd_client->client->osdc; + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request_type_valid(obj_request->type)); + rbd_assert(obj_request->img_request); - return rbd_obj_request_submit(osdc, obj_request); + if (img_obj_request_simple(obj_request)) { + rbd_obj_request_submit(obj_request); + return 0; } /* @@ -3006,12 +3057,8 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) rbd_assert(obj_request->img_request); rbd_dev = obj_request->img_request->rbd_dev; if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, obj_request); - if (!img_result) - return; + rbd_obj_request_submit(obj_request); + return; } obj_request->result = img_result; @@ -3084,65 +3131,724 @@ out_err: obj_request_done_set(obj_request); } -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev); -static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev); +static const struct rbd_client_id rbd_empty_cid; -static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, - u64 notifier_id, void *data, size_t data_len) +static bool rbd_cid_equal(const struct rbd_client_id *lhs, + const struct rbd_client_id *rhs) +{ + return lhs->gid == rhs->gid && lhs->handle == rhs->handle; +} + +static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) +{ + struct rbd_client_id cid; + + mutex_lock(&rbd_dev->watch_mutex); + cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); + cid.handle = rbd_dev->watch_cookie; + mutex_unlock(&rbd_dev->watch_mutex); + return cid; +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_set_owner_cid(struct rbd_device *rbd_dev, + const struct rbd_client_id *cid) +{ + dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, + cid->gid, cid->handle); + rbd_dev->owner_cid = *cid; /* struct */ +} + +static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) +{ + mutex_lock(&rbd_dev->watch_mutex); + sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); + mutex_unlock(&rbd_dev->watch_mutex); +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_lock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + char cookie[32]; + int ret; + + WARN_ON(__rbd_is_lock_owner(rbd_dev)); + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, + RBD_LOCK_TAG, "", 0); + if (ret) + return ret; + + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; + rbd_set_owner_cid(rbd_dev, &cid); + queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); + return 0; +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_unlock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + char cookie[32]; + int ret; + + WARN_ON(!__rbd_is_lock_owner(rbd_dev)); + + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, cookie); + if (ret && ret != -ENOENT) { + rbd_warn(rbd_dev, "cls_unlock failed: %d", ret); + return ret; + } + + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); + return 0; +} + +static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, + enum rbd_notify_op notify_op, + struct page ***preply_pages, + size_t *preply_len) { - struct rbd_device *rbd_dev = arg; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; + char buf[buf_size]; + void *p = buf; + + dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); + + /* encode *LockPayload NotifyMessage (op + ClientId) */ + ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, notify_op); + ceph_encode_64(&p, cid.gid); + ceph_encode_64(&p, cid.handle); + + return ceph_osdc_notify(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, buf, buf_size, + RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); +} + +static void rbd_notify_op_lock(struct rbd_device *rbd_dev, + enum rbd_notify_op notify_op) +{ + struct page **reply_pages; + size_t reply_len; + + __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); +} + +static void rbd_notify_acquired_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + acquired_lock_work); + + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); +} + +static void rbd_notify_released_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + released_lock_work); + + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); +} + +static int rbd_request_lock(struct rbd_device *rbd_dev) +{ + struct page **reply_pages; + size_t reply_len; + bool lock_owner_responded = false; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, + &reply_pages, &reply_len); + if (ret && ret != -ETIMEDOUT) { + rbd_warn(rbd_dev, "failed to request lock: %d", ret); + goto out; + } + + if (reply_len > 0 && reply_len <= PAGE_SIZE) { + void *p = page_address(reply_pages[0]); + void *const end = p + reply_len; + u32 n; + + ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ + while (n--) { + u8 struct_v; + u32 len; + + ceph_decode_need(&p, end, 8 + 8, e_inval); + p += 8 + 8; /* skip gid and cookie */ + + ceph_decode_32_safe(&p, end, len, e_inval); + if (!len) + continue; + + if (lock_owner_responded) { + rbd_warn(rbd_dev, + "duplicate lock owners detected"); + ret = -EIO; + goto out; + } + + lock_owner_responded = true; + ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", + &struct_v, &len); + if (ret) { + rbd_warn(rbd_dev, + "failed to decode ResponseMessage: %d", + ret); + goto e_inval; + } + + ret = ceph_decode_32(&p); + } + } + + if (!lock_owner_responded) { + rbd_warn(rbd_dev, "no lock owners detected"); + ret = -ETIMEDOUT; + } + +out: + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +{ + dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + + cancel_delayed_work(&rbd_dev->lock_dwork); + if (wake_all) + wake_up_all(&rbd_dev->lock_waitq); + else + wake_up(&rbd_dev->lock_waitq); +} + +static int get_lock_owner_info(struct rbd_device *rbd_dev, + struct ceph_locker **lockers, u32 *num_lockers) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + u8 lock_type; + char *lock_tag; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + &lock_type, &lock_tag, lockers, num_lockers); + if (ret) + return ret; + + if (*num_lockers == 0) { + dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); + goto out; + } + + if (strcmp(lock_tag, RBD_LOCK_TAG)) { + rbd_warn(rbd_dev, "locked by external mechanism, tag %s", + lock_tag); + ret = -EBUSY; + goto out; + } + + if (lock_type == CEPH_CLS_LOCK_SHARED) { + rbd_warn(rbd_dev, "shared lock type detected"); + ret = -EBUSY; + goto out; + } + + if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, + strlen(RBD_LOCK_COOKIE_PREFIX))) { + rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", + (*lockers)[0].id.cookie); + ret = -EBUSY; + goto out; + } + +out: + kfree(lock_tag); + return ret; +} + +static int find_watcher(struct rbd_device *rbd_dev, + const struct ceph_locker *locker) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_watch_item *watchers; + u32 num_watchers; + u64 cookie; + int i; + int ret; + + ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, &watchers, + &num_watchers); + if (ret) + return ret; + + sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); + for (i = 0; i < num_watchers; i++) { + if (!memcmp(&watchers[i].addr, &locker->info.addr, + sizeof(locker->info.addr)) && + watchers[i].cookie == cookie) { + struct rbd_client_id cid = { + .gid = le64_to_cpu(watchers[i].name.num), + .handle = cookie, + }; + + dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, + rbd_dev, cid.gid, cid.handle); + rbd_set_owner_cid(rbd_dev, &cid); + ret = 1; + goto out; + } + } + + dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); + ret = 0; +out: + kfree(watchers); + return ret; +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_try_lock(struct rbd_device *rbd_dev) +{ + struct ceph_client *client = rbd_dev->rbd_client->client; + struct ceph_locker *lockers; + u32 num_lockers; + int ret; + + for (;;) { + ret = rbd_lock(rbd_dev); + if (ret != -EBUSY) + return ret; + + /* determine if the current lock holder is still alive */ + ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); + if (ret) + return ret; + + if (num_lockers == 0) + goto again; + + ret = find_watcher(rbd_dev, lockers); + if (ret) { + if (ret > 0) + ret = 0; /* have to request lock */ + goto out; + } + + rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_monc_blacklist_add(&client->monc, + &lockers[0].info.addr); + if (ret) { + rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", + ENTITY_NAME(lockers[0].id.name), ret); + goto out; + } + + ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + lockers[0].id.cookie, + &lockers[0].id.name); + if (ret && ret != -ENOENT) + goto out; + +again: + ceph_free_lockers(lockers, num_lockers); + } + +out: + ceph_free_lockers(lockers, num_lockers); + return ret; +} + +/* + * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED + */ +static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, + int *pret) +{ + enum rbd_lock_state lock_state; + + down_read(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (__rbd_is_lock_owner(rbd_dev)) { + lock_state = rbd_dev->lock_state; + up_read(&rbd_dev->lock_rwsem); + return lock_state; + } + + up_read(&rbd_dev->lock_rwsem); + down_write(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (!__rbd_is_lock_owner(rbd_dev)) { + *pret = rbd_try_lock(rbd_dev); + if (*pret) + rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + } + + lock_state = rbd_dev->lock_state; + up_write(&rbd_dev->lock_rwsem); + return lock_state; +} + +static void rbd_acquire_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(to_delayed_work(work), + struct rbd_device, lock_dwork); + enum rbd_lock_state lock_state; int ret; - dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev, - cookie, notify_id); + dout("%s rbd_dev %p\n", __func__, rbd_dev); +again: + lock_state = rbd_try_acquire_lock(rbd_dev, &ret); + if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { + if (lock_state == RBD_LOCK_STATE_LOCKED) + wake_requests(rbd_dev, true); + dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, + rbd_dev, lock_state, ret); + return; + } + + ret = rbd_request_lock(rbd_dev); + if (ret == -ETIMEDOUT) { + goto again; /* treat this as a dead client */ + } else if (ret < 0) { + rbd_warn(rbd_dev, "error requesting lock: %d", ret); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, + RBD_RETRY_DELAY); + } else { + /* + * lock owner acked, but resend if we don't see them + * release the lock + */ + dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, + rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, + msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); + } +} + +/* + * lock_rwsem must be held for write + */ +static bool rbd_release_lock(struct rbd_device *rbd_dev) +{ + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) + return false; + rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; + downgrade_write(&rbd_dev->lock_rwsem); /* - * Until adequate refresh error handling is in place, there is - * not much we can do here, except warn. + * Ensure that all in-flight IO is flushed. * - * See http://tracker.ceph.com/issues/5040 + * FIXME: ceph_osdc_sync() flushes the entire OSD client, which + * may be shared with other devices. */ - ret = rbd_dev_refresh(rbd_dev); - if (ret) - rbd_warn(rbd_dev, "refresh failed: %d", ret); + ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); + up_read(&rbd_dev->lock_rwsem); + + down_write(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) + return false; + + if (!rbd_unlock(rbd_dev)) + /* + * Give others a chance to grab the lock - we would re-acquire + * almost immediately if we got new IO during ceph_osdc_sync() + * otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_wait_state_locked() + * after wake_requests() in rbd_handle_released_lock(). + */ + cancel_delayed_work(&rbd_dev->lock_dwork); + + return true; +} + +static void rbd_release_lock_work(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + unlock_work); + + down_write(&rbd_dev->lock_rwsem); + rbd_release_lock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); +} + +static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id cid = { 0 }; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { + down_write(&rbd_dev->lock_rwsem); + if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { + /* + * we already know that the remote client is + * the owner + */ + up_write(&rbd_dev->lock_rwsem); + return; + } + + rbd_set_owner_cid(rbd_dev, &cid); + downgrade_write(&rbd_dev->lock_rwsem); + } else { + down_read(&rbd_dev->lock_rwsem); + } + + if (!__rbd_is_lock_owner(rbd_dev)) + wake_requests(rbd_dev, false); + up_read(&rbd_dev->lock_rwsem); +} + +static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id cid = { 0 }; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { + down_write(&rbd_dev->lock_rwsem); + if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { + dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", + __func__, rbd_dev, cid.gid, cid.handle, + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); + up_write(&rbd_dev->lock_rwsem); + return; + } + + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + downgrade_write(&rbd_dev->lock_rwsem); + } else { + down_read(&rbd_dev->lock_rwsem); + } + + if (!__rbd_is_lock_owner(rbd_dev)) + wake_requests(rbd_dev, false); + up_read(&rbd_dev->lock_rwsem); +} + +static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); + struct rbd_client_id cid = { 0 }; + bool need_to_send; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (rbd_cid_equal(&cid, &my_cid)) + return false; + + down_read(&rbd_dev->lock_rwsem); + need_to_send = __rbd_is_lock_owner(rbd_dev); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { + if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) { + dout("%s rbd_dev %p queueing unlock_work\n", __func__, + rbd_dev); + queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work); + } + } + up_read(&rbd_dev->lock_rwsem); + return need_to_send; +} + +static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, + u64 notify_id, u64 cookie, s32 *result) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; + char buf[buf_size]; + int ret; + + if (result) { + void *p = buf; + + /* encode ResponseMessage */ + ceph_start_encoding(&p, 1, 1, + buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, *result); + } else { + buf_size = 0; + } ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, notify_id, cookie, - NULL, 0); + buf, buf_size); if (ret) - rbd_warn(rbd_dev, "notify_ack ret %d", ret); + rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); } -static void rbd_watch_errcb(void *arg, u64 cookie, int err) +static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, + u64 cookie) +{ + dout("%s rbd_dev %p\n", __func__, rbd_dev); + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); +} + +static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, + u64 notify_id, u64 cookie, s32 result) +{ + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); +} + +static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len) { struct rbd_device *rbd_dev = arg; + void *p = data; + void *const end = p + data_len; + u8 struct_v; + u32 len; + u32 notify_op; int ret; - rbd_warn(rbd_dev, "encountered watch error: %d", err); + dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", + __func__, rbd_dev, cookie, notify_id, data_len); + if (data_len) { + ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", + &struct_v, &len); + if (ret) { + rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", + ret); + return; + } - __rbd_dev_header_unwatch_sync(rbd_dev); + notify_op = ceph_decode_32(&p); + } else { + /* legacy notification for header updates */ + notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; + len = 0; + } - ret = rbd_dev_header_watch_sync(rbd_dev); - if (ret) { - rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); - return; + dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); + switch (notify_op) { + case RBD_NOTIFY_OP_ACQUIRED_LOCK: + rbd_handle_acquired_lock(rbd_dev, struct_v, &p); + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_RELEASED_LOCK: + rbd_handle_released_lock(rbd_dev, struct_v, &p); + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_REQUEST_LOCK: + if (rbd_handle_request_lock(rbd_dev, struct_v, &p)) + /* + * send ResponseMessage(0) back so the client + * can detect a missing owner + */ + rbd_acknowledge_notify_result(rbd_dev, notify_id, + cookie, 0); + else + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_HEADER_UPDATE: + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "refresh failed: %d", ret); + + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + default: + if (rbd_is_lock_owner(rbd_dev)) + rbd_acknowledge_notify_result(rbd_dev, notify_id, + cookie, -EOPNOTSUPP); + else + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; } +} - ret = rbd_dev_refresh(rbd_dev); - if (ret) - rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); +static void __rbd_unregister_watch(struct rbd_device *rbd_dev); + +static void rbd_watch_errcb(void *arg, u64 cookie, int err) +{ + struct rbd_device *rbd_dev = arg; + + rbd_warn(rbd_dev, "encountered watch error: %d", err); + + down_write(&rbd_dev->lock_rwsem); + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + up_write(&rbd_dev->lock_rwsem); + + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { + __rbd_unregister_watch(rbd_dev); + rbd_dev->watch_state = RBD_WATCH_STATE_ERROR; + + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0); + } + mutex_unlock(&rbd_dev->watch_mutex); } /* - * Initiate a watch request, synchronously. + * watch_mutex must be locked */ -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) +static int __rbd_register_watch(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_linger_request *handle; rbd_assert(!rbd_dev->watch_handle); + dout("%s rbd_dev %p\n", __func__, rbd_dev); handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, rbd_watch_cb, @@ -3154,13 +3860,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) return 0; } -static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +/* + * watch_mutex must be locked + */ +static void __rbd_unregister_watch(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret; - if (!rbd_dev->watch_handle) - return; + rbd_assert(rbd_dev->watch_handle); + dout("%s rbd_dev %p\n", __func__, rbd_dev); ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); if (ret) @@ -3169,17 +3878,106 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) rbd_dev->watch_handle = NULL; } -/* - * Tear down a watch request, synchronously. - */ -static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +static int rbd_register_watch(struct rbd_device *rbd_dev) { - __rbd_dev_header_unwatch_sync(rbd_dev); + int ret; + + mutex_lock(&rbd_dev->watch_mutex); + rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED); + ret = __rbd_register_watch(rbd_dev); + if (ret) + goto out; + + rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; + rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; + +out: + mutex_unlock(&rbd_dev->watch_mutex); + return ret; +} + +static void cancel_tasks_sync(struct rbd_device *rbd_dev) +{ + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + cancel_delayed_work_sync(&rbd_dev->watch_dwork); + cancel_work_sync(&rbd_dev->acquired_lock_work); + cancel_work_sync(&rbd_dev->released_lock_work); + cancel_delayed_work_sync(&rbd_dev->lock_dwork); + cancel_work_sync(&rbd_dev->unlock_work); +} + +static void rbd_unregister_watch(struct rbd_device *rbd_dev) +{ + WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); + cancel_tasks_sync(rbd_dev); + + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) + __rbd_unregister_watch(rbd_dev); + rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; + mutex_unlock(&rbd_dev->watch_mutex); - dout("%s flushing notifies\n", __func__); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); } +static void rbd_reregister_watch(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(to_delayed_work(work), + struct rbd_device, watch_dwork); + bool was_lock_owner = false; + bool need_to_wake = false; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + down_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) + was_lock_owner = rbd_release_lock(rbd_dev); + + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) { + mutex_unlock(&rbd_dev->watch_mutex); + goto out; + } + + ret = __rbd_register_watch(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); + if (ret == -EBLACKLISTED || ret == -ENOENT) { + set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); + need_to_wake = true; + } else { + queue_delayed_work(rbd_dev->task_wq, + &rbd_dev->watch_dwork, + RBD_RETRY_DELAY); + } + mutex_unlock(&rbd_dev->watch_mutex); + goto out; + } + + need_to_wake = true; + rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; + rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; + mutex_unlock(&rbd_dev->watch_mutex); + + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + + if (was_lock_owner) { + ret = rbd_try_lock(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration lock failed: %d", + ret); + } + +out: + up_write(&rbd_dev->lock_rwsem); + if (need_to_wake) + wake_requests(rbd_dev, true); +} + /* * Synchronous osd object method call. Returns the number of bytes * returned in the outbound buffer, or a negative error code. @@ -3193,7 +3991,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, void *inbound, size_t inbound_size) { - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct page **pages; u32 page_count; @@ -3242,11 +4039,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, obj_request->pages, inbound_size, 0, false, false); - rbd_osd_req_format_read(obj_request); - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; + rbd_obj_request_submit(obj_request); ret = rbd_obj_request_wait(obj_request); if (ret) goto out; @@ -3267,6 +4061,31 @@ out: return ret; } +/* + * lock_rwsem must be held for read + */ +static void rbd_wait_state_locked(struct rbd_device *rbd_dev) +{ + DEFINE_WAIT(wait); + + do { + /* + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_requests(). + */ + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, + TASK_UNINTERRUPTIBLE); + up_read(&rbd_dev->lock_rwsem); + schedule(); + down_read(&rbd_dev->lock_rwsem); + } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && + !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)); + + finish_wait(&rbd_dev->lock_waitq, &wait); +} + static void rbd_queue_workfn(struct work_struct *work) { struct request *rq = blk_mq_rq_from_pdu(work); @@ -3277,6 +4096,7 @@ static void rbd_queue_workfn(struct work_struct *work) u64 length = blk_rq_bytes(rq); enum obj_operation_type op_type; u64 mapping_size; + bool must_be_locked; int result; if (rq->cmd_type != REQ_TYPE_FS) { @@ -3338,6 +4158,10 @@ static void rbd_queue_workfn(struct work_struct *work) if (op_type != OBJ_OP_READ) { snapc = rbd_dev->header.snapc; ceph_get_snap_context(snapc); + must_be_locked = rbd_is_lock_supported(rbd_dev); + } else { + must_be_locked = rbd_dev->opts->lock_on_read && + rbd_is_lock_supported(rbd_dev); } up_read(&rbd_dev->header_rwsem); @@ -3348,11 +4172,25 @@ static void rbd_queue_workfn(struct work_struct *work) goto err_rq; } + if (must_be_locked) { + down_read(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED && + !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) + rbd_wait_state_locked(rbd_dev); + + WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^ + !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)); + if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) { + result = -EBLACKLISTED; + goto err_unlock; + } + } + img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, snapc); if (!img_request) { result = -ENOMEM; - goto err_rq; + goto err_unlock; } img_request->rq = rq; snapc = NULL; /* img_request consumes a ref */ @@ -3370,10 +4208,15 @@ static void rbd_queue_workfn(struct work_struct *work) if (result) goto err_img_request; + if (must_be_locked) + up_read(&rbd_dev->lock_rwsem); return; err_img_request: rbd_img_request_put(img_request); +err_unlock: + if (must_be_locked) + up_read(&rbd_dev->lock_rwsem); err_rq: if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", @@ -3415,7 +4258,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, u64 offset, u64 length, void *buf) { - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct page **pages = NULL; u32 page_count; @@ -3448,11 +4290,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, obj_request->length, obj_request->offset & ~PAGE_MASK, false, false); - rbd_osd_req_format_read(obj_request); - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; + rbd_obj_request_submit(obj_request); ret = rbd_obj_request_wait(obj_request); if (ret) goto out; @@ -3621,7 +4460,6 @@ static int rbd_init_request(void *data, struct request *rq, static struct blk_mq_ops rbd_mq_ops = { .queue_rq = rbd_queue_rq, - .map_queue = blk_mq_map_queue, .init_request = rbd_init_request, }; @@ -3752,13 +4590,40 @@ static ssize_t rbd_minor_show(struct device *dev, return sprintf(buf, "%d\n", rbd_dev->minor); } +static ssize_t rbd_client_addr_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + struct ceph_entity_addr *client_addr = + ceph_client_addr(rbd_dev->rbd_client->client); + + return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr, + le32_to_cpu(client_addr->nonce)); +} + static ssize_t rbd_client_id_show(struct device *dev, struct device_attribute *attr, char *buf) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); return sprintf(buf, "client%lld\n", - ceph_client_id(rbd_dev->rbd_client->client)); + ceph_client_gid(rbd_dev->rbd_client->client)); +} + +static ssize_t rbd_cluster_fsid_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); +} + +static ssize_t rbd_config_info_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%s\n", rbd_dev->config_info); } static ssize_t rbd_pool_show(struct device *dev, @@ -3810,6 +4675,14 @@ static ssize_t rbd_snap_show(struct device *dev, return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); } +static ssize_t rbd_snap_id_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id); +} + /* * For a v2 image, shows the chain of parent images, separated by empty * lines. For v1 images or if there is no parent, shows "(no parent @@ -3862,13 +4735,17 @@ static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); +static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL); static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); +static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL); +static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL); static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); +static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); static struct attribute *rbd_attrs[] = { @@ -3876,12 +4753,16 @@ static struct attribute *rbd_attrs[] = { &dev_attr_features.attr, &dev_attr_major.attr, &dev_attr_minor.attr, + &dev_attr_client_addr.attr, &dev_attr_client_id.attr, + &dev_attr_cluster_fsid.attr, + &dev_attr_config_info.attr, &dev_attr_pool.attr, &dev_attr_pool_id.attr, &dev_attr_name.attr, &dev_attr_image_id.attr, &dev_attr_current_snap.attr, + &dev_attr_snap_id.attr, &dev_attr_parent.attr, &dev_attr_refresh.attr, NULL @@ -3944,18 +4825,32 @@ static void rbd_spec_free(struct kref *kref) kfree(spec); } -static void rbd_dev_release(struct device *dev) +static void rbd_dev_free(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - bool need_put = !!rbd_dev->opts; + WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); + WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); ceph_oid_destroy(&rbd_dev->header_oid); ceph_oloc_destroy(&rbd_dev->header_oloc); + kfree(rbd_dev->config_info); rbd_put_client(rbd_dev->rbd_client); rbd_spec_put(rbd_dev->spec); kfree(rbd_dev->opts); kfree(rbd_dev); +} + +static void rbd_dev_release(struct device *dev) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + bool need_put = !!rbd_dev->opts; + + if (need_put) { + destroy_workqueue(rbd_dev->task_wq); + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); + } + + rbd_dev_free(rbd_dev); /* * This is racy, but way better than putting module outside of @@ -3966,25 +4861,34 @@ static void rbd_dev_release(struct device *dev) module_put(THIS_MODULE); } -static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, - struct rbd_spec *spec, - struct rbd_options *opts) +static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, + struct rbd_spec *spec) { struct rbd_device *rbd_dev; - rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); + rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); if (!rbd_dev) return NULL; spin_lock_init(&rbd_dev->lock); - rbd_dev->flags = 0; - atomic_set(&rbd_dev->parent_ref, 0); INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); ceph_oid_init(&rbd_dev->header_oid); ceph_oloc_init(&rbd_dev->header_oloc); + mutex_init(&rbd_dev->watch_mutex); + rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; + INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); + + init_rwsem(&rbd_dev->lock_rwsem); + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); + INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); + INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); + INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); + init_waitqueue_head(&rbd_dev->lock_waitq); + rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; @@ -3992,9 +4896,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->rbd_client = rbdc; rbd_dev->spec = spec; - rbd_dev->opts = opts; - - /* Initialize the layout used for all rbd requests */ rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER; rbd_dev->layout.stripe_count = 1; @@ -4002,15 +4903,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->layout.pool_id = spec->pool_id; RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); - /* - * If this is a mapping rbd_dev (as opposed to a parent one), - * pin our module. We have a ref from do_rbd_add(), so use - * __module_get(). - */ - if (rbd_dev->opts) - __module_get(THIS_MODULE); + return rbd_dev; +} + +/* + * Create a mapping rbd_dev. + */ +static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, + struct rbd_spec *spec, + struct rbd_options *opts) +{ + struct rbd_device *rbd_dev; + + rbd_dev = __rbd_dev_create(rbdc, spec); + if (!rbd_dev) + return NULL; + + rbd_dev->opts = opts; + + /* get an id and fill in device name */ + rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, + minor_to_rbd_dev_id(1 << MINORBITS), + GFP_KERNEL); + if (rbd_dev->dev_id < 0) + goto fail_rbd_dev; + sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id); + rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM, + rbd_dev->name); + if (!rbd_dev->task_wq) + goto fail_dev_id; + + /* we have a ref from do_rbd_add() */ + __module_get(THIS_MODULE); + + dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); return rbd_dev; + +fail_dev_id: + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); +fail_rbd_dev: + rbd_dev_free(rbd_dev); + return NULL; } static void rbd_dev_destroy(struct rbd_device *rbd_dev) @@ -4646,46 +5580,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev) } /* - * Get a unique rbd identifier for the given new rbd_dev, and add - * the rbd_dev to the global list. - */ -static int rbd_dev_id_get(struct rbd_device *rbd_dev) -{ - int new_dev_id; - - new_dev_id = ida_simple_get(&rbd_dev_id_ida, - 0, minor_to_rbd_dev_id(1 << MINORBITS), - GFP_KERNEL); - if (new_dev_id < 0) - return new_dev_id; - - rbd_dev->dev_id = new_dev_id; - - spin_lock(&rbd_dev_list_lock); - list_add_tail(&rbd_dev->node, &rbd_dev_list); - spin_unlock(&rbd_dev_list_lock); - - dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id); - - return 0; -} - -/* - * Remove an rbd_dev from the global list, and record that its - * identifier is no longer in use. - */ -static void rbd_dev_id_put(struct rbd_device *rbd_dev) -{ - spin_lock(&rbd_dev_list_lock); - list_del_init(&rbd_dev->node); - spin_unlock(&rbd_dev_list_lock); - - ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); - - dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id); -} - -/* * Skips over white space at *buf, and updates *buf to point to the * first found non-space character (if any). Returns the length of * the token (string of non-white space characters) found. Note @@ -4860,6 +5754,7 @@ static int rbd_add_parse_args(const char *buf, rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; + rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; copts = ceph_parse_options(options, mon_addrs, mon_addrs + mon_addrs_size - 1, @@ -5077,8 +5972,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) goto out_err; } - parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec, - NULL); + parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); if (!parent) { ret = -ENOMEM; goto out_err; @@ -5113,22 +6007,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) { int ret; - /* Get an id and fill in device name. */ - - ret = rbd_dev_id_get(rbd_dev); - if (ret) - goto err_out_unlock; - - BUILD_BUG_ON(DEV_NAME_LEN - < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); - sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); - /* Record our major and minor device numbers. */ if (!single_major) { ret = register_blkdev(0, rbd_dev->name); if (ret < 0) - goto err_out_id; + goto err_out_unlock; rbd_dev->major = ret; rbd_dev->minor = 0; @@ -5160,9 +6044,14 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); up_write(&rbd_dev->header_rwsem); + spin_lock(&rbd_dev_list_lock); + list_add_tail(&rbd_dev->node, &rbd_dev_list); + spin_unlock(&rbd_dev_list_lock); + add_disk(rbd_dev->disk); - pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, - (unsigned long long) rbd_dev->mapping.size); + pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, + (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, + rbd_dev->header.features); return ret; @@ -5173,8 +6062,6 @@ err_out_disk: err_out_blkdev: if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); -err_out_id: - rbd_dev_id_put(rbd_dev); err_out_unlock: up_write(&rbd_dev->header_rwsem); return ret; @@ -5235,7 +6122,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) goto err_out_format; if (!depth) { - ret = rbd_dev_header_watch_sync(rbd_dev); + ret = rbd_register_watch(rbd_dev); if (ret) { if (ret == -ENOENT) pr_info("image %s/%s does not exist\n", @@ -5294,7 +6181,7 @@ err_out_probe: rbd_dev_unprobe(rbd_dev); err_out_watch: if (!depth) - rbd_dev_header_unwatch_sync(rbd_dev); + rbd_unregister_watch(rbd_dev); err_out_format: rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); @@ -5346,10 +6233,18 @@ static ssize_t do_rbd_add(struct bus_type *bus, spec = NULL; /* rbd_dev now owns this */ rbd_opts = NULL; /* rbd_dev now owns this */ + rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); + if (!rbd_dev->config_info) { + rc = -ENOMEM; + goto err_out_rbd_dev; + } + down_write(&rbd_dev->header_rwsem); rc = rbd_dev_image_probe(rbd_dev, 0); - if (rc < 0) + if (rc < 0) { + up_write(&rbd_dev->header_rwsem); goto err_out_rbd_dev; + } /* If we are mapping a snapshot it must be marked read-only */ @@ -5361,11 +6256,11 @@ static ssize_t do_rbd_add(struct bus_type *bus, rc = rbd_dev_device_setup(rbd_dev); if (rc) { /* - * rbd_dev_header_unwatch_sync() can't be moved into + * rbd_unregister_watch() can't be moved into * rbd_dev_image_release() without refactoring, see * commit 1f3ef78861ac. */ - rbd_dev_header_unwatch_sync(rbd_dev); + rbd_unregister_watch(rbd_dev); rbd_dev_image_release(rbd_dev); goto out; } @@ -5376,7 +6271,6 @@ out: return rc; err_out_rbd_dev: - up_write(&rbd_dev->header_rwsem); rbd_dev_destroy(rbd_dev); err_out_client: rbd_put_client(rbdc); @@ -5406,12 +6300,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, static void rbd_dev_device_release(struct rbd_device *rbd_dev) { rbd_free_disk(rbd_dev); + + spin_lock(&rbd_dev_list_lock); + list_del_init(&rbd_dev->node); + spin_unlock(&rbd_dev_list_lock); + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); device_del(&rbd_dev->dev); rbd_dev_mapping_clear(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); - rbd_dev_id_put(rbd_dev); } static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) @@ -5447,18 +6345,26 @@ static ssize_t do_rbd_remove(struct bus_type *bus, struct rbd_device *rbd_dev = NULL; struct list_head *tmp; int dev_id; - unsigned long ul; + char opt_buf[6]; bool already = false; + bool force = false; int ret; - ret = kstrtoul(buf, 10, &ul); - if (ret) - return ret; - - /* convert to int; abort if we lost anything in the conversion */ - dev_id = (int)ul; - if (dev_id != ul) + dev_id = -1; + opt_buf[0] = '\0'; + sscanf(buf, "%d %5s", &dev_id, opt_buf); + if (dev_id < 0) { + pr_err("dev_id out of range\n"); return -EINVAL; + } + if (opt_buf[0] != '\0') { + if (!strcmp(opt_buf, "force")) { + force = true; + } else { + pr_err("bad remove option at '%s'\n", opt_buf); + return -EINVAL; + } + } ret = -ENOENT; spin_lock(&rbd_dev_list_lock); @@ -5471,7 +6377,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus, } if (!ret) { spin_lock_irq(&rbd_dev->lock); - if (rbd_dev->open_count) + if (rbd_dev->open_count && !force) ret = -EBUSY; else already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, @@ -5482,7 +6388,20 @@ static ssize_t do_rbd_remove(struct bus_type *bus, if (ret < 0 || already) return ret; - rbd_dev_header_unwatch_sync(rbd_dev); + if (force) { + /* + * Prevent new IO from being queued and wait for existing + * IO to complete/fail. + */ + blk_mq_freeze_queue(rbd_dev->disk->queue); + blk_set_queue_dying(rbd_dev->disk->queue); + } + + down_write(&rbd_dev->lock_rwsem); + if (__rbd_is_lock_owner(rbd_dev)) + rbd_unlock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); + rbd_unregister_watch(rbd_dev); /* * Don't free anything from rbd_dev->disk until after all diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h index 49d77cbcf8bd..94f367db27b0 100644 --- a/drivers/block/rbd_types.h +++ b/drivers/block/rbd_types.h @@ -28,6 +28,17 @@ #define RBD_DATA_PREFIX "rbd_data." #define RBD_ID_PREFIX "rbd_id." +#define RBD_LOCK_NAME "rbd_lock" +#define RBD_LOCK_TAG "internal" +#define RBD_LOCK_COOKIE_PREFIX "auto" + +enum rbd_notify_op { + RBD_NOTIFY_OP_ACQUIRED_LOCK = 0, + RBD_NOTIFY_OP_RELEASED_LOCK = 1, + RBD_NOTIFY_OP_REQUEST_LOCK = 2, + RBD_NOTIFY_OP_HEADER_UPDATE = 3, +}; + /* * For format version 1, rbd image 'foo' consists of objects * foo.rbd - image metadata diff --git a/drivers/block/virtio_blk.c b/drivers/block/virtio_blk.c index 93b1aaa5ba3b..5545a679abd8 100644 --- a/drivers/block/virtio_blk.c +++ b/drivers/block/virtio_blk.c @@ -376,7 +376,7 @@ static void virtblk_config_changed(struct virtio_device *vdev) static int init_vq(struct virtio_blk *vblk) { - int err = 0; + int err; int i; vq_callback_t **callbacks; const char **names; @@ -390,13 +390,13 @@ static int init_vq(struct virtio_blk *vblk) if (err) num_vqs = 1; - vblk->vqs = kmalloc(sizeof(*vblk->vqs) * num_vqs, GFP_KERNEL); + vblk->vqs = kmalloc_array(num_vqs, sizeof(*vblk->vqs), GFP_KERNEL); if (!vblk->vqs) return -ENOMEM; - names = kmalloc(sizeof(*names) * num_vqs, GFP_KERNEL); - callbacks = kmalloc(sizeof(*callbacks) * num_vqs, GFP_KERNEL); - vqs = kmalloc(sizeof(*vqs) * num_vqs, GFP_KERNEL); + names = kmalloc_array(num_vqs, sizeof(*names), GFP_KERNEL); + callbacks = kmalloc_array(num_vqs, sizeof(*callbacks), GFP_KERNEL); + vqs = kmalloc_array(num_vqs, sizeof(*vqs), GFP_KERNEL); if (!names || !callbacks || !vqs) { err = -ENOMEM; goto out; @@ -542,7 +542,6 @@ static int virtblk_init_request(void *data, struct request *rq, static struct blk_mq_ops virtio_mq_ops = { .queue_rq = virtio_queue_rq, - .map_queue = blk_mq_map_queue, .complete = virtblk_request_done, .init_request = virtblk_init_request, }; diff --git a/drivers/block/xen-blkfront.c b/drivers/block/xen-blkfront.c index 88ef6d4729b4..9908597c5209 100644 --- a/drivers/block/xen-blkfront.c +++ b/drivers/block/xen-blkfront.c @@ -909,7 +909,6 @@ out_busy: static struct blk_mq_ops blkfront_mq_ops = { .queue_rq = blkif_queue_rq, - .map_queue = blk_mq_map_queue, }; static void blkif_set_queue_limits(struct blkfront_info *info) |