From 3cf6429a26da5c4d7b795e6d0f8f56ed2e4fdfc0 Mon Sep 17 00:00:00 2001 From: Latchesar Ionkov Date: Sun, 8 Jan 2006 01:04:58 -0800 Subject: [PATCH] v9fs: new multiplexer implementation New multiplexer implementation. Decreases the number of kernel threads required. Better handling when the user process receives a signal. Signed-off-by: Latchesar Ionkov Cc: Eric Van Hensbergen Signed-off-by: Andrew Morton Signed-off-by: Linus Torvalds --- fs/9p/mux.c | 1122 ++++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 794 insertions(+), 328 deletions(-) (limited to 'fs/9p/mux.c') diff --git a/fs/9p/mux.c b/fs/9p/mux.c index 8835b576f744..62b6ad0767e1 100644 --- a/fs/9p/mux.c +++ b/fs/9p/mux.c @@ -4,7 +4,7 @@ * Protocol Multiplexer * * Copyright (C) 2004 by Eric Van Hensbergen - * Copyright (C) 2004 by Latchesar Ionkov + * Copyright (C) 2004-2005 by Latchesar Ionkov * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -38,438 +39,903 @@ #include "conv.h" #include "mux.h" -/** - * dprintcond - print condition of session info - * @v9ses: session info structure - * @req: RPC request structure - * - */ +#define ERREQFLUSH 1 +#define SCHED_TIMEOUT 10 +#define MAXPOLLWADDR 2 + +enum { + Rworksched = 1, /* read work scheduled or running */ + Rpending = 2, /* can read */ + Wworksched = 4, /* write work scheduled or running */ + Wpending = 8, /* can write */ +}; + +struct v9fs_mux_poll_task; + +struct v9fs_req { + int tag; + struct v9fs_fcall *tcall; + struct v9fs_fcall *rcall; + int err; + v9fs_mux_req_callback cb; + void *cba; + struct list_head req_list; +}; + +struct v9fs_mux_data { + spinlock_t lock; + struct list_head mux_list; + struct v9fs_mux_poll_task *poll_task; + int msize; + unsigned char *extended; + struct v9fs_transport *trans; + struct v9fs_idpool tidpool; + int err; + wait_queue_head_t equeue; + struct list_head req_list; + struct list_head unsent_req_list; + int rpos; + char *rbuf; + int wpos; + int wsize; + char *wbuf; + wait_queue_t poll_wait[MAXPOLLWADDR]; + wait_queue_head_t *poll_waddr[MAXPOLLWADDR]; + poll_table pt; + struct work_struct rq; + struct work_struct wq; + unsigned long wsched; +}; + +struct v9fs_mux_poll_task { + struct task_struct *task; + struct list_head mux_list; + int muxnum; +}; + +struct v9fs_mux_rpc { + struct v9fs_mux_data *m; + struct v9fs_req *req; + int err; + struct v9fs_fcall *rcall; + wait_queue_head_t wqueue; +}; + +static int v9fs_poll_proc(void *); +static void v9fs_read_work(void *); +static void v9fs_write_work(void *); +static void v9fs_pollwait(struct file *filp, wait_queue_head_t * wait_address, + poll_table * p); + +static DECLARE_MUTEX(v9fs_mux_task_lock); +static struct workqueue_struct *v9fs_mux_wq; + +static int v9fs_mux_num; +static int v9fs_mux_poll_task_num; +static struct v9fs_mux_poll_task v9fs_mux_poll_tasks[100]; + +void v9fs_mux_global_init(void) +{ + int i; + + for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++) + v9fs_mux_poll_tasks[i].task = NULL; + + v9fs_mux_wq = create_workqueue("v9fs"); +} -static inline int -dprintcond(struct v9fs_session_info *v9ses, struct v9fs_rpcreq *req) +void v9fs_mux_global_exit(void) { - dprintk(DEBUG_MUX, "condition: %d, %p\n", v9ses->transport->status, - req->rcall); - return 0; + destroy_workqueue(v9fs_mux_wq); } /** - * xread - force read of a certain number of bytes - * @v9ses: session info structure - * @ptr: pointer to buffer - * @sz: number of bytes to read + * v9fs_mux_calc_poll_procs - calculates the number of polling procs + * based on the number of mounted v9fs filesystems. * - * Chuck Cranor CS-533 project1 + * The current implementation returns sqrt of the number of mounts. */ +inline int v9fs_mux_calc_poll_procs(int muxnum) +{ + int n; + + if (v9fs_mux_poll_task_num) + n = muxnum / v9fs_mux_poll_task_num + + (muxnum % v9fs_mux_poll_task_num ? 1 : 0); + else + n = 1; + + if (n > ARRAY_SIZE(v9fs_mux_poll_tasks)) + n = ARRAY_SIZE(v9fs_mux_poll_tasks); -static int xread(struct v9fs_session_info *v9ses, void *ptr, unsigned long sz) + return n; +} + +static void v9fs_mux_poll_start(struct v9fs_mux_data *m) { - int rd = 0; - int ret = 0; - while (rd < sz) { - ret = v9ses->transport->read(v9ses->transport, ptr, sz - rd); - if (ret <= 0) { - dprintk(DEBUG_ERROR, "xread errno %d\n", ret); - return ret; + int i, n; + struct v9fs_mux_poll_task *vpt, *vptlast; + + dprintk(DEBUG_MUX, "mux %p muxnum %d procnum %d\n", m, v9fs_mux_num, + v9fs_mux_poll_task_num); + up(&v9fs_mux_task_lock); + + n = v9fs_mux_calc_poll_procs(v9fs_mux_num + 1); + if (n > v9fs_mux_poll_task_num) { + for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++) { + if (v9fs_mux_poll_tasks[i].task == NULL) { + vpt = &v9fs_mux_poll_tasks[i]; + dprintk(DEBUG_MUX, "create proc %p\n", vpt); + vpt->task = kthread_create(v9fs_poll_proc, + vpt, "v9fs-poll"); + INIT_LIST_HEAD(&vpt->mux_list); + vpt->muxnum = 0; + v9fs_mux_poll_task_num++; + wake_up_process(vpt->task); + break; + } } - rd += ret; - ptr += ret; - } - return (rd); -} -/** - * read_message - read a full 9P2000 fcall packet - * @v9ses: session info structure - * @rcall: fcall structure to read into - * @rcalllen: size of fcall buffer - * - */ + if (i >= ARRAY_SIZE(v9fs_mux_poll_tasks)) + dprintk(DEBUG_ERROR, "warning: no free poll slots\n"); + } -static int -read_message(struct v9fs_session_info *v9ses, - struct v9fs_fcall *rcall, int rcalllen) -{ - unsigned char buf[4]; - void *data; - int size = 0; - int res = 0; - - res = xread(v9ses, buf, sizeof(buf)); - if (res < 0) { - dprintk(DEBUG_ERROR, - "Reading of count field failed returned: %d\n", res); - return res; + n = (v9fs_mux_num + 1) / v9fs_mux_poll_task_num + + ((v9fs_mux_num + 1) % v9fs_mux_poll_task_num ? 1 : 0); + + vptlast = NULL; + for (i = 0; i < ARRAY_SIZE(v9fs_mux_poll_tasks); i++) { + vpt = &v9fs_mux_poll_tasks[i]; + if (vpt->task != NULL) { + vptlast = vpt; + if (vpt->muxnum < n) { + dprintk(DEBUG_MUX, "put in proc %d\n", i); + list_add(&m->mux_list, &vpt->mux_list); + vpt->muxnum++; + m->poll_task = vpt; + memset(&m->poll_waddr, 0, sizeof(m->poll_waddr)); + init_poll_funcptr(&m->pt, v9fs_pollwait); + break; + } + } } - if (res < 4) { - dprintk(DEBUG_ERROR, - "Reading of count field failed returned: %d\n", res); - return -EIO; + if (i >= ARRAY_SIZE(v9fs_mux_poll_tasks)) { + dprintk(DEBUG_MUX, "put in proc %d\n", i); + list_add(&m->mux_list, &vptlast->mux_list); + vptlast->muxnum++; + m->poll_task = vpt; + memset(&m->poll_waddr, 0, sizeof(m->poll_waddr)); + init_poll_funcptr(&m->pt, v9fs_pollwait); } - size = buf[0] | (buf[1] << 8) | (buf[2] << 16) | (buf[3] << 24); - dprintk(DEBUG_MUX, "got a packet count: %d\n", size); + v9fs_mux_num++; + down(&v9fs_mux_task_lock); +} - /* adjust for the four bytes of size */ - size -= 4; +static void v9fs_mux_poll_stop(struct v9fs_mux_data *m) +{ + int i; + struct v9fs_mux_poll_task *vpt; + + up(&v9fs_mux_task_lock); + vpt = m->poll_task; + list_del(&m->mux_list); + for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) { + if (m->poll_waddr[i] != NULL) { + remove_wait_queue(m->poll_waddr[i], &m->poll_wait[i]); + m->poll_waddr[i] = NULL; + } + } + vpt->muxnum--; + if (!vpt->muxnum) { + dprintk(DEBUG_MUX, "destroy proc %p\n", vpt); + send_sig(SIGKILL, vpt->task, 1); + vpt->task = NULL; + v9fs_mux_poll_task_num--; + } + v9fs_mux_num--; + down(&v9fs_mux_task_lock); +} - if (size > v9ses->maxdata) { - dprintk(DEBUG_ERROR, "packet too big: %d\n", size); - return -E2BIG; +/** + * v9fs_mux_init - allocate and initialize the per-session mux data + * Creates the polling task if this is the first session. + * + * @trans - transport structure + * @msize - maximum message size + * @extended - pointer to the extended flag + */ +struct v9fs_mux_data *v9fs_mux_init(struct v9fs_transport *trans, int msize, + unsigned char *extended) +{ + int i, n; + struct v9fs_mux_data *m, *mtmp; + + dprintk(DEBUG_MUX, "transport %p msize %d\n", trans, msize); + m = kmalloc(sizeof(struct v9fs_mux_data) + 2 * msize, GFP_KERNEL); + if (!m) + return ERR_PTR(-ENOMEM); + + spin_lock_init(&m->lock); + INIT_LIST_HEAD(&m->mux_list); + m->msize = msize; + m->extended = extended; + m->trans = trans; + idr_init(&m->tidpool.pool); + init_MUTEX(&m->tidpool.lock); + m->err = 0; + init_waitqueue_head(&m->equeue); + INIT_LIST_HEAD(&m->req_list); + INIT_LIST_HEAD(&m->unsent_req_list); + m->rpos = 0; + m->rbuf = (char *)m + sizeof(struct v9fs_mux_data); + m->wpos = m->wsize = 0; + m->wbuf = m->rbuf + msize; + INIT_WORK(&m->rq, v9fs_read_work, m); + INIT_WORK(&m->wq, v9fs_write_work, m); + m->wsched = 0; + memset(&m->poll_waddr, 0, sizeof(m->poll_waddr)); + v9fs_mux_poll_start(m); + + n = trans->poll(trans, &m->pt); + if (n & POLLIN) { + dprintk(DEBUG_MUX, "mux %p can read\n", m); + set_bit(Rpending, &m->wsched); } - data = kmalloc(size, GFP_KERNEL); - if (!data) { - eprintk(KERN_WARNING, "out of memory\n"); - return -ENOMEM; + if (n & POLLOUT) { + dprintk(DEBUG_MUX, "mux %p can write\n", m); + set_bit(Wpending, &m->wsched); } - res = xread(v9ses, data, size); - if (res < size) { - dprintk(DEBUG_ERROR, "Reading of fcall failed returned: %d\n", - res); - kfree(data); - return res; + for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) { + if (IS_ERR(m->poll_waddr[i])) { + v9fs_mux_poll_stop(m); + mtmp = (void *)m->poll_waddr; /* the error code */ + kfree(m); + m = mtmp; + break; + } } - /* we now have an in-memory string that is the reply. - * deserialize it. There is very little to go wrong at this point - * save for v9fs_alloc errors. - */ - res = v9fs_deserialize_fcall(v9ses, size, data, v9ses->maxdata, - rcall, rcalllen); + return m; +} - kfree(data); +/** + * v9fs_mux_destroy - cancels all pending requests and frees mux resources + */ +void v9fs_mux_destroy(struct v9fs_mux_data *m) +{ + dprintk(DEBUG_MUX, "mux %p prev %p next %p\n", m, + m->mux_list.prev, m->mux_list.next); + v9fs_mux_cancel(m, -ECONNRESET); + + if (!list_empty(&m->req_list)) { + /* wait until all processes waiting on this session exit */ + dprintk(DEBUG_MUX, "mux %p waiting for empty request queue\n", + m); + wait_event_timeout(m->equeue, (list_empty(&m->req_list)), 5000); + dprintk(DEBUG_MUX, "mux %p request queue empty: %d\n", m, + list_empty(&m->req_list)); + } - if (res < 0) - return res; + v9fs_mux_poll_stop(m); + m->trans = NULL; - return 0; + kfree(m); } /** - * v9fs_recv - receive an RPC response for a particular tag - * @v9ses: session info structure - * @req: RPC request structure - * + * v9fs_pollwait - called by files poll operation to add v9fs-poll task + * to files wait queue */ - -static int v9fs_recv(struct v9fs_session_info *v9ses, struct v9fs_rpcreq *req) +static void +v9fs_pollwait(struct file *filp, wait_queue_head_t * wait_address, + poll_table * p) { - int ret = 0; - - dprintk(DEBUG_MUX, "waiting for response: %d\n", req->tcall->tag); - ret = wait_event_interruptible(v9ses->read_wait, - ((v9ses->transport->status != Connected) || - (req->rcall != 0) || (req->err < 0) || - dprintcond(v9ses, req))); + int i; + struct v9fs_mux_data *m; - dprintk(DEBUG_MUX, "got it: rcall %p\n", req->rcall); + m = container_of(p, struct v9fs_mux_data, pt); + for(i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) + if (m->poll_waddr[i] == NULL) + break; - spin_lock(&v9ses->muxlock); - list_del(&req->next); - spin_unlock(&v9ses->muxlock); + if (i >= ARRAY_SIZE(m->poll_waddr)) { + dprintk(DEBUG_ERROR, "not enough wait_address slots\n"); + return; + } - if (req->err < 0) - return req->err; + m->poll_waddr[i] = wait_address; - if (v9ses->transport->status == Disconnected) - return -ECONNRESET; + if (!wait_address) { + dprintk(DEBUG_ERROR, "no wait_address\n"); + m->poll_waddr[i] = ERR_PTR(-EIO); + return; + } - return ret; + init_waitqueue_entry(&m->poll_wait[i], m->poll_task->task); + add_wait_queue(wait_address, &m->poll_wait[i]); } /** - * v9fs_send - send a 9P request - * @v9ses: session info structure - * @req: RPC request to send - * + * v9fs_poll_mux - polls a mux and schedules read or write works if necessary */ - -static int v9fs_send(struct v9fs_session_info *v9ses, struct v9fs_rpcreq *req) +static inline void v9fs_poll_mux(struct v9fs_mux_data *m) { - int ret = -1; - void *data = NULL; - struct v9fs_fcall *tcall = req->tcall; - - data = kmalloc(v9ses->maxdata + V9FS_IOHDRSZ, GFP_KERNEL); - if (!data) - return -ENOMEM; - - tcall->size = 0; /* enforce size recalculation */ - ret = - v9fs_serialize_fcall(v9ses, tcall, data, - v9ses->maxdata + V9FS_IOHDRSZ); - if (ret < 0) - goto free_data; - - spin_lock(&v9ses->muxlock); - list_add(&req->next, &v9ses->mux_fcalls); - spin_unlock(&v9ses->muxlock); - - dprintk(DEBUG_MUX, "sending message: tag %d size %d\n", tcall->tag, - tcall->size); - ret = v9ses->transport->write(v9ses->transport, data, tcall->size); - - if (ret != tcall->size) { - spin_lock(&v9ses->muxlock); - list_del(&req->next); - kfree(req->rcall); + int n; - spin_unlock(&v9ses->muxlock); - if (ret >= 0) - ret = -EREMOTEIO; - } else - ret = 0; + if (m->err < 0) + return; + + n = m->trans->poll(m->trans, NULL); + if (n < 0 || n & (POLLERR | POLLHUP | POLLNVAL)) { + dprintk(DEBUG_MUX, "error mux %p err %d\n", m, n); + if (n >= 0) + n = -ECONNRESET; + v9fs_mux_cancel(m, n); + } + + if (n & POLLIN) { + set_bit(Rpending, &m->wsched); + dprintk(DEBUG_MUX, "mux %p can read\n", m); + if (!test_and_set_bit(Rworksched, &m->wsched)) { + dprintk(DEBUG_MUX, "schedule read work mux %p\n", m); + queue_work(v9fs_mux_wq, &m->rq); + } + } - free_data: - kfree(data); - return ret; + if (n & POLLOUT) { + set_bit(Wpending, &m->wsched); + dprintk(DEBUG_MUX, "mux %p can write\n", m); + if ((m->wsize || !list_empty(&m->unsent_req_list)) + && !test_and_set_bit(Wworksched, &m->wsched)) { + dprintk(DEBUG_MUX, "schedule write work mux %p\n", m); + queue_work(v9fs_mux_wq, &m->wq); + } + } } /** - * v9fs_mux_rpc - send a request, receive a response - * @v9ses: session info structure - * @tcall: fcall to send - * @rcall: buffer to place response into - * + * v9fs_poll_proc - polls all v9fs transports for new events and queues + * the appropriate work to the work queue */ - -long -v9fs_mux_rpc(struct v9fs_session_info *v9ses, struct v9fs_fcall *tcall, - struct v9fs_fcall **rcall) +static int v9fs_poll_proc(void *a) { - int tid = -1; - struct v9fs_fcall *fcall = NULL; - struct v9fs_rpcreq req; - int ret = -1; + struct v9fs_mux_data *m, *mtmp; + struct v9fs_mux_poll_task *vpt; - if (!v9ses) - return -EINVAL; + vpt = a; + dprintk(DEBUG_MUX, "start %p %p\n", current, vpt); + allow_signal(SIGKILL); + while (!kthread_should_stop()) { + set_current_state(TASK_INTERRUPTIBLE); + if (signal_pending(current)) + break; - if (!v9ses->transport || v9ses->transport->status != Connected) - return -EIO; + list_for_each_entry_safe(m, mtmp, &vpt->mux_list, mux_list) { + v9fs_poll_mux(m); + } + + dprintk(DEBUG_MUX, "sleeping...\n"); + schedule_timeout(SCHED_TIMEOUT * HZ); + } - if (rcall) - *rcall = NULL; + __set_current_state(TASK_RUNNING); + dprintk(DEBUG_MUX, "finish\n"); + return 0; +} - if (tcall->id != TVERSION) { - tid = v9fs_get_idpool(&v9ses->tidpool); - if (tid < 0) - return -ENOMEM; +static inline int v9fs_write_req(struct v9fs_mux_data *m, struct v9fs_req *req) +{ + int n; + + list_move_tail(&req->req_list, &m->req_list); + n = v9fs_serialize_fcall(req->tcall, m->wbuf, m->msize, *m->extended); + if (n < 0) { + req->err = n; + list_del(&req->req_list); + if (req->cb) { + spin_unlock(&m->lock); + (*req->cb) (req->cba, req->tcall, req->rcall, req->err); + req->cb = NULL; + spin_lock(&m->lock); + } else + kfree(req->rcall); + + kfree(req); } - tcall->tag = tid; + return n; +} - req.tcall = tcall; - req.err = 0; - req.rcall = NULL; +/** + * v9fs_write_work - called when a transport can send some data + */ +static void v9fs_write_work(void *a) +{ + int n, err; + struct v9fs_mux_data *m; + struct v9fs_req *req, *rtmp; - ret = v9fs_send(v9ses, &req); + m = a; - if (ret < 0) { - if (tcall->id != TVERSION) - v9fs_put_idpool(tid, &v9ses->tidpool); - dprintk(DEBUG_MUX, "error %d\n", ret); - return ret; + if (m->err < 0) { + clear_bit(Wworksched, &m->wsched); + return; } - ret = v9fs_recv(v9ses, &req); - - fcall = req.rcall; - - dprintk(DEBUG_MUX, "received: tag=%x, ret=%d\n", tcall->tag, ret); - if (ret == -ERESTARTSYS) { - if (v9ses->transport->status != Disconnected - && tcall->id != TFLUSH) { - unsigned long flags; - - dprintk(DEBUG_MUX, "flushing the tag: %d\n", - tcall->tag); - clear_thread_flag(TIF_SIGPENDING); - v9fs_t_flush(v9ses, tcall->tag); - spin_lock_irqsave(¤t->sighand->siglock, flags); - recalc_sigpending(); - spin_unlock_irqrestore(¤t->sighand->siglock, - flags); - dprintk(DEBUG_MUX, "flushing done\n"); + if (!m->wsize) { + if (list_empty(&m->unsent_req_list)) { + clear_bit(Wworksched, &m->wsched); + return; } - goto release_req; - } else if (ret < 0) - goto release_req; - - if (!fcall) - ret = -EIO; - else { - if (fcall->id == RERROR) { - ret = v9fs_errstr2errno(fcall->params.rerror.error); - if (ret == 0) { /* string match failed */ - if (fcall->params.rerror.errno) - ret = -(fcall->params.rerror.errno); - else - ret = -ESERVERFAULT; - } - } else if (fcall->id != tcall->id + 1) { - dprintk(DEBUG_ERROR, - "fcall mismatch: expected %d, got %d\n", - tcall->id + 1, fcall->id); - ret = -EIO; + err = 0; + spin_lock(&m->lock); + list_for_each_entry_safe(req, rtmp, &m->unsent_req_list, + req_list) { + err = v9fs_write_req(m, req); + if (err > 0) + break; } + + m->wsize = err; + m->wpos = 0; + spin_unlock(&m->lock); } - release_req: - if (tcall->id != TVERSION) - v9fs_put_idpool(tid, &v9ses->tidpool); - if (rcall) - *rcall = fcall; - else - kfree(fcall); + dprintk(DEBUG_MUX, "mux %p pos %d size %d\n", m, m->wpos, m->wsize); + clear_bit(Wpending, &m->wsched); + err = m->trans->write(m->trans, m->wbuf + m->wpos, m->wsize - m->wpos); + dprintk(DEBUG_MUX, "mux %p sent %d bytes\n", m, err); + if (err == -EAGAIN) { + clear_bit(Wworksched, &m->wsched); + return; + } + + if (err <= 0) + goto error; + + m->wpos += err; + if (m->wpos == m->wsize) + m->wpos = m->wsize = 0; + + if (m->wsize == 0 && !list_empty(&m->unsent_req_list)) { + if (test_and_clear_bit(Wpending, &m->wsched)) + n = POLLOUT; + else + n = m->trans->poll(m->trans, NULL); + + if (n & POLLOUT) { + dprintk(DEBUG_MUX, "schedule write work mux %p\n", m); + queue_work(v9fs_mux_wq, &m->wq); + } else + clear_bit(Wworksched, &m->wsched); + } else + clear_bit(Wworksched, &m->wsched); - return ret; + return; + + error: + v9fs_mux_cancel(m, err); + clear_bit(Wworksched, &m->wsched); } -/** - * v9fs_mux_cancel_requests - cancels all pending requests - * - * @v9ses: session info structure - * @err: error code to return to the requests - */ -void v9fs_mux_cancel_requests(struct v9fs_session_info *v9ses, int err) +static void process_request(struct v9fs_mux_data *m, struct v9fs_req *req) { - struct v9fs_rpcreq *rptr; - struct v9fs_rpcreq *rreq; + int ecode, tag; + char *ename; + + tag = req->tag; + if (req->rcall->id == RERROR && !req->err) { + ecode = req->rcall->params.rerror.errno; + ename = req->rcall->params.rerror.error; - dprintk(DEBUG_MUX, " %d\n", err); - spin_lock(&v9ses->muxlock); - list_for_each_entry_safe(rreq, rptr, &v9ses->mux_fcalls, next) { - rreq->err = err; + dprintk(DEBUG_MUX, "Rerror %s\n", ename); + + if (*m->extended) + req->err = -ecode; + + if (!req->err) { + req->err = v9fs_errstr2errno(ename); + + if (!req->err) { /* string match failed */ + dprintk(DEBUG_ERROR, "unknown error: %s\n", + ename); + } + + if (!req->err) + req->err = -ESERVERFAULT; + } + } else if (req->tcall && req->rcall->id != req->tcall->id + 1) { + dprintk(DEBUG_ERROR, "fcall mismatch: expected %d, got %d\n", + req->tcall->id + 1, req->rcall->id); + if (!req->err) + req->err = -EIO; } - spin_unlock(&v9ses->muxlock); - wake_up_all(&v9ses->read_wait); + + if (req->cb && req->err != ERREQFLUSH) { + dprintk(DEBUG_MUX, "calling callback tcall %p rcall %p\n", + req->tcall, req->rcall); + + (*req->cb) (req->cba, req->tcall, req->rcall, req->err); + req->cb = NULL; + } else + kfree(req->rcall); + + if (tag != V9FS_NOTAG) + v9fs_put_idpool(tag, &m->tidpool); + + wake_up(&m->equeue); + kfree(req); } /** - * v9fs_recvproc - kproc to handle demultiplexing responses - * @data: session info structure - * + * v9fs_read_work - called when there is some data to be read from a transport */ - -static int v9fs_recvproc(void *data) +static void v9fs_read_work(void *a) { - struct v9fs_session_info *v9ses = (struct v9fs_session_info *)data; - struct v9fs_fcall *rcall = NULL; - struct v9fs_rpcreq *rptr; - struct v9fs_rpcreq *req; - struct v9fs_rpcreq *rreq; - int err = 0; + int n, err, rcallen; + struct v9fs_mux_data *m; + struct v9fs_req *req, *rptr, *rreq; + struct v9fs_fcall *rcall; + + m = a; + + if (m->err < 0) + return; + + rcall = NULL; + dprintk(DEBUG_MUX, "start mux %p pos %d\n", m, m->rpos); + clear_bit(Rpending, &m->wsched); + err = m->trans->read(m->trans, m->rbuf + m->rpos, m->msize - m->rpos); + dprintk(DEBUG_MUX, "mux %p got %d bytes\n", m, err); + if (err == -EAGAIN) { + clear_bit(Rworksched, &m->wsched); + return; + } - allow_signal(SIGKILL); - set_current_state(TASK_INTERRUPTIBLE); - complete(&v9ses->proccmpl); - while (!kthread_should_stop() && err >= 0) { - req = rptr = rreq = NULL; + if (err <= 0) + goto error; - rcall = kmalloc(v9ses->maxdata + V9FS_IOHDRSZ, GFP_KERNEL); - if (!rcall) { - eprintk(KERN_ERR, "no memory for buffers\n"); + m->rpos += err; + while (m->rpos > 4) { + n = le32_to_cpu(*(__le32 *) m->rbuf); + if (n >= m->msize) { + dprintk(DEBUG_ERROR, + "requested packet size too big: %d\n", n); + err = -EIO; + goto error; + } + + if (m->rpos < n) break; + + rcallen = n + V9FS_FCALLHDRSZ; + rcall = kmalloc(rcallen, GFP_KERNEL); + if (!rcall) { + err = -ENOMEM; + goto error; } - err = read_message(v9ses, rcall, v9ses->maxdata + V9FS_IOHDRSZ); - spin_lock(&v9ses->muxlock); + dump_data(m->rbuf, n); + err = v9fs_deserialize_fcall(m->rbuf, n, rcall, rcallen, + *m->extended); if (err < 0) { - list_for_each_entry_safe(rreq, rptr, &v9ses->mux_fcalls, next) { - rreq->err = err; - } - if(err != -ERESTARTSYS) - eprintk(KERN_ERR, - "Transport error while reading message %d\n", err); - } else { - list_for_each_entry_safe(rreq, rptr, &v9ses->mux_fcalls, next) { - if (rreq->tcall->tag == rcall->tag) { - req = rreq; - req->rcall = rcall; - break; - } - } + kfree(rcall); + goto error; } - if (req && (req->tcall->id == TFLUSH)) { - struct v9fs_rpcreq *treq = NULL; - list_for_each_entry_safe(treq, rptr, &v9ses->mux_fcalls, next) { - if (treq->tcall->tag == - req->tcall->params.tflush.oldtag) { - list_del(&rptr->next); - kfree(treq->rcall); - break; - } + dprintk(DEBUG_MUX, "mux %p fcall id %d tag %d\n", m, rcall->id, + rcall->tag); + + req = NULL; + spin_lock(&m->lock); + list_for_each_entry_safe(rreq, rptr, &m->req_list, req_list) { + if (rreq->tag == rcall->tag) { + req = rreq; + req->rcall = rcall; + list_del(&req->req_list); + spin_unlock(&m->lock); + process_request(m, req); + break; } } - spin_unlock(&v9ses->muxlock); - if (!req) { - if (err >= 0) + spin_unlock(&m->lock); + if (err >= 0 && rcall->id != RFLUSH) dprintk(DEBUG_ERROR, - "unexpected response: id %d tag %d\n", - rcall->id, rcall->tag); - + "unexpected response mux %p id %d tag %d\n", + m, rcall->id, rcall->tag); kfree(rcall); } - wake_up_all(&v9ses->read_wait); - set_current_state(TASK_INTERRUPTIBLE); + if (m->rpos > n) + memmove(m->rbuf, m->rbuf + n, m->rpos - n); + m->rpos -= n; } - v9ses->transport->close(v9ses->transport); - - /* Inform all pending processes about the failure */ - wake_up_all(&v9ses->read_wait); - - if (signal_pending(current)) - complete(&v9ses->proccmpl); + if (!list_empty(&m->req_list)) { + if (test_and_clear_bit(Rpending, &m->wsched)) + n = POLLIN; + else + n = m->trans->poll(m->trans, NULL); + + if (n & POLLIN) { + dprintk(DEBUG_MUX, "schedule read work mux %p\n", m); + queue_work(v9fs_mux_wq, &m->rq); + } else + clear_bit(Rworksched, &m->wsched); + } else + clear_bit(Rworksched, &m->wsched); - dprintk(DEBUG_MUX, "recvproc: end\n"); - v9ses->recvproc = NULL; + return; - return err >= 0; + error: + v9fs_mux_cancel(m, err); + clear_bit(Rworksched, &m->wsched); } /** - * v9fs_mux_init - initialize multiplexer (spawn kproc) - * @v9ses: session info structure - * @dev_name: mount device information (to create unique kproc) + * v9fs_send_request - send 9P request + * The function can sleep until the request is scheduled for sending. + * The function can be interrupted. Return from the function is not + * a guarantee that the request is sent succesfully. Can return errors + * that can be retrieved by PTR_ERR macros. * + * @m: mux data + * @tc: request to be sent + * @cb: callback function to call when response is received + * @cba: parameter to pass to the callback function */ +static struct v9fs_req *v9fs_send_request(struct v9fs_mux_data *m, + struct v9fs_fcall *tc, + v9fs_mux_req_callback cb, void *cba) +{ + int n; + struct v9fs_req *req; + + dprintk(DEBUG_MUX, "mux %p task %p tcall %p id %d\n", m, current, + tc, tc->id); + if (m->err < 0) + return ERR_PTR(m->err); + + req = kmalloc(sizeof(struct v9fs_req), GFP_KERNEL); + if (!req) + return ERR_PTR(-ENOMEM); -int v9fs_mux_init(struct v9fs_session_info *v9ses, const char *dev_name) + if (tc->id == TVERSION) + n = V9FS_NOTAG; + else + n = v9fs_get_idpool(&m->tidpool); + + if (n < 0) + return ERR_PTR(-ENOMEM); + + tc->tag = n; + req->tag = n; + req->tcall = tc; + req->rcall = NULL; + req->err = 0; + req->cb = cb; + req->cba = cba; + + spin_lock(&m->lock); + list_add_tail(&req->req_list, &m->unsent_req_list); + spin_unlock(&m->lock); + + if (test_and_clear_bit(Wpending, &m->wsched)) + n = POLLOUT; + else + n = m->trans->poll(m->trans, NULL); + + if (n & POLLOUT && !test_and_set_bit(Wworksched, &m->wsched)) + queue_work(v9fs_mux_wq, &m->wq); + + return req; +} + +static inline void +v9fs_mux_flush_cb(void *a, struct v9fs_fcall *tc, struct v9fs_fcall *rc, + int err) { - char procname[60]; - - strncpy(procname, dev_name, sizeof(procname)); - procname[sizeof(procname) - 1] = 0; - - init_waitqueue_head(&v9ses->read_wait); - init_completion(&v9ses->fcread); - init_completion(&v9ses->proccmpl); - spin_lock_init(&v9ses->muxlock); - INIT_LIST_HEAD(&v9ses->mux_fcalls); - v9ses->recvproc = NULL; - v9ses->curfcall = NULL; - - v9ses->recvproc = kthread_create(v9fs_recvproc, v9ses, - "v9fs_recvproc %s", procname); - - if (IS_ERR(v9ses->recvproc)) { - eprintk(KERN_ERR, "cannot create receiving thread\n"); - v9fs_session_close(v9ses); - return -ECONNABORTED; + v9fs_mux_req_callback cb; + int tag; + struct v9fs_mux_data *m; + struct v9fs_req *req, *rptr; + + m = a; + dprintk(DEBUG_MUX, "mux %p tc %p rc %p err %d oldtag %d\n", m, tc, + rc, err, tc->params.tflush.oldtag); + + spin_lock(&m->lock); + cb = NULL; + tag = tc->params.tflush.oldtag; + list_for_each_entry_safe(req, rptr, &m->req_list, req_list) { + if (req->tag == tag) { + list_del(&req->req_list); + if (req->cb) { + cb = req->cb; + req->cb = NULL; + spin_unlock(&m->lock); + (*cb) (req->cba, req->tcall, req->rcall, + req->err); + } + kfree(req); + wake_up(&m->equeue); + break; + } + } + + if (!cb) + spin_unlock(&m->lock); + + if (v9fs_check_idpool(tag, &m->tidpool)) + v9fs_put_idpool(tag, &m->tidpool); + + kfree(tc); + kfree(rc); +} + +static void +v9fs_mux_flush_request(struct v9fs_mux_data *m, struct v9fs_req *req) +{ + struct v9fs_fcall *fc; + + dprintk(DEBUG_MUX, "mux %p req %p tag %d\n", m, req, req->tag); + + fc = kmalloc(sizeof(struct v9fs_fcall), GFP_KERNEL); + fc->id = TFLUSH; + fc->params.tflush.oldtag = req->tag; + + v9fs_send_request(m, fc, v9fs_mux_flush_cb, m); +} + +static void +v9fs_mux_rpc_cb(void *a, struct v9fs_fcall *tc, struct v9fs_fcall *rc, int err) +{ + struct v9fs_mux_rpc *r; + + if (err == ERREQFLUSH) { + dprintk(DEBUG_MUX, "err req flush\n"); + return; + } + + r = a; + dprintk(DEBUG_MUX, "mux %p req %p tc %p rc %p err %d\n", r->m, r->req, + tc, rc, err); + r->rcall = rc; + r->err = err; + wake_up(&r->wqueue); +} + +/** + * v9fs_mux_rpc - sends 9P request and waits until a response is available. + * The function can be interrupted. + * @m: mux data + * @tc: request to be sent + * @rc: pointer where a pointer to the response is stored + */ +int +v9fs_mux_rpc(struct v9fs_mux_data *m, struct v9fs_fcall *tc, + struct v9fs_fcall **rc) +{ + int err; + unsigned long flags; + struct v9fs_req *req; + struct v9fs_mux_rpc r; + + r.err = 0; + r.rcall = NULL; + r.m = m; + init_waitqueue_head(&r.wqueue); + + if (rc) + *rc = NULL; + + req = v9fs_send_request(m, tc, v9fs_mux_rpc_cb, &r); + if (IS_ERR(req)) { + err = PTR_ERR(req); + dprintk(DEBUG_MUX, "error %d\n", err); + return PTR_ERR(req); + } + + r.req = req; + dprintk(DEBUG_MUX, "mux %p tc %p tag %d rpc %p req %p\n", m, tc, + req->tag, &r, req); + err = wait_event_interruptible(r.wqueue, r.rcall != NULL || r.err < 0); + if (r.err < 0) + err = r.err; + + if (err == -ERESTARTSYS && m->trans->status == Connected && m->err == 0) { + spin_lock(&m->lock); + req->tcall = NULL; + req->err = ERREQFLUSH; + spin_unlock(&m->lock); + + clear_thread_flag(TIF_SIGPENDING); + v9fs_mux_flush_request(m, req); + spin_lock_irqsave(¤t->sighand->siglock, flags); + recalc_sigpending(); + spin_unlock_irqrestore(¤t->sighand->siglock, flags); } - wake_up_process(v9ses->recvproc); - wait_for_completion(&v9ses->proccmpl); + if (!err) { + if (r.rcall) + dprintk(DEBUG_MUX, "got response id %d tag %d\n", + r.rcall->id, r.rcall->tag); + + if (rc) + *rc = r.rcall; + else + kfree(r.rcall); + } else { + kfree(r.rcall); + dprintk(DEBUG_MUX, "got error %d\n", err); + if (err > 0) + err = -EIO; + } + + return err; +} + +/** + * v9fs_mux_rpcnb - sends 9P request without waiting for response. + * @m: mux data + * @tc: request to be sent + * @cb: callback function to be called when response arrives + * @cba: value to pass to the callback function + */ +int v9fs_mux_rpcnb(struct v9fs_mux_data *m, struct v9fs_fcall *tc, + v9fs_mux_req_callback cb, void *a) +{ + int err; + struct v9fs_req *req; + + req = v9fs_send_request(m, tc, cb, a); + if (IS_ERR(req)) { + err = PTR_ERR(req); + dprintk(DEBUG_MUX, "error %d\n", err); + return PTR_ERR(req); + } + dprintk(DEBUG_MUX, "mux %p tc %p tag %d\n", m, tc, req->tag); return 0; } + +/** + * v9fs_mux_cancel - cancel all pending requests with error + * @m: mux data + * @err: error code + */ +void v9fs_mux_cancel(struct v9fs_mux_data *m, int err) +{ + struct v9fs_req *req, *rtmp; + LIST_HEAD(cancel_list); + + dprintk(DEBUG_MUX, "mux %p err %d\n", m, err); + m->err = err; + spin_lock(&m->lock); + list_for_each_entry_safe(req, rtmp, &m->req_list, req_list) { + list_move(&req->req_list, &cancel_list); + } + spin_unlock(&m->lock); + + list_for_each_entry_safe(req, rtmp, &cancel_list, req_list) { + list_del(&req->req_list); + if (!req->err) + req->err = err; + + if (req->cb) + (*req->cb) (req->cba, req->tcall, req->rcall, req->err); + else + kfree(req->rcall); + + kfree(req); + } + + wake_up(&m->equeue); +} -- cgit v1.2.1