diff options
Diffstat (limited to 'net/sunrpc/svc_xprt.c')
-rw-r--r-- | net/sunrpc/svc_xprt.c | 292 |
1 files changed, 162 insertions, 130 deletions
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index bbb3b044b877..c69358b3cf7f 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -220,9 +220,11 @@ static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, */ static void svc_xprt_received(struct svc_xprt *xprt) { - WARN_ON_ONCE(!test_bit(XPT_BUSY, &xprt->xpt_flags)); - if (!test_bit(XPT_BUSY, &xprt->xpt_flags)) + if (!test_bit(XPT_BUSY, &xprt->xpt_flags)) { + WARN_ONCE(1, "xprt=0x%p already busy!", xprt); return; + } + /* As soon as we clear busy, the xprt could be closed and * 'put', so we need a reference to call svc_xprt_do_enqueue with: */ @@ -310,25 +312,6 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) } EXPORT_SYMBOL_GPL(svc_print_addr); -/* - * Queue up an idle server thread. Must have pool->sp_lock held. - * Note: this is really a stack rather than a queue, so that we only - * use as many different threads as we need, and the rest don't pollute - * the cache. - */ -static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) -{ - list_add(&rqstp->rq_list, &pool->sp_threads); -} - -/* - * Dequeue an nfsd thread. Must have pool->sp_lock held. - */ -static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) -{ - list_del(&rqstp->rq_list); -} - static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) { if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE))) @@ -341,11 +324,12 @@ static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) static void svc_xprt_do_enqueue(struct svc_xprt *xprt) { struct svc_pool *pool; - struct svc_rqst *rqstp; + struct svc_rqst *rqstp = NULL; int cpu; + bool queued = false; if (!svc_xprt_has_something_to_do(xprt)) - return; + goto out; /* Mark transport as busy. It will remain in this state until * the provider calls svc_xprt_received. We update XPT_BUSY @@ -355,43 +339,69 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt) if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { /* Don't enqueue transport while already enqueued */ dprintk("svc: transport %p busy, not enqueued\n", xprt); - return; + goto out; } cpu = get_cpu(); pool = svc_pool_for_cpu(xprt->xpt_server, cpu); - spin_lock_bh(&pool->sp_lock); - pool->sp_stats.packets++; - - if (!list_empty(&pool->sp_threads)) { - rqstp = list_entry(pool->sp_threads.next, - struct svc_rqst, - rq_list); - dprintk("svc: transport %p served by daemon %p\n", - xprt, rqstp); - svc_thread_dequeue(pool, rqstp); - if (rqstp->rq_xprt) - printk(KERN_ERR - "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", - rqstp, rqstp->rq_xprt); - /* Note the order of the following 3 lines: - * We want to assign xprt to rqstp->rq_xprt only _after_ - * we've woken up the process, so that we don't race with - * the lockless check in svc_get_next_xprt(). + atomic_long_inc(&pool->sp_stats.packets); + +redo_search: + /* find a thread for this xprt */ + rcu_read_lock(); + list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) { + /* Do a lockless check first */ + if (test_bit(RQ_BUSY, &rqstp->rq_flags)) + continue; + + /* + * Once the xprt has been queued, it can only be dequeued by + * the task that intends to service it. All we can do at that + * point is to try to wake this thread back up so that it can + * do so. */ - svc_xprt_get(xprt); + if (!queued) { + spin_lock_bh(&rqstp->rq_lock); + if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) { + /* already busy, move on... */ + spin_unlock_bh(&rqstp->rq_lock); + continue; + } + + /* this one will do */ + rqstp->rq_xprt = xprt; + svc_xprt_get(xprt); + spin_unlock_bh(&rqstp->rq_lock); + } + rcu_read_unlock(); + + atomic_long_inc(&pool->sp_stats.threads_woken); wake_up_process(rqstp->rq_task); - rqstp->rq_xprt = xprt; - pool->sp_stats.threads_woken++; - } else { + put_cpu(); + goto out; + } + rcu_read_unlock(); + + /* + * We didn't find an idle thread to use, so we need to queue the xprt. + * Do so and then search again. If we find one, we can't hook this one + * up to it directly but we can wake the thread up in the hopes that it + * will pick it up once it searches for a xprt to service. + */ + if (!queued) { + queued = true; dprintk("svc: transport %p put into queue\n", xprt); + spin_lock_bh(&pool->sp_lock); list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); pool->sp_stats.sockets_queued++; + spin_unlock_bh(&pool->sp_lock); + goto redo_search; } - - spin_unlock_bh(&pool->sp_lock); + rqstp = NULL; put_cpu(); +out: + trace_svc_xprt_do_enqueue(xprt, rqstp); } /* @@ -408,22 +418,28 @@ void svc_xprt_enqueue(struct svc_xprt *xprt) EXPORT_SYMBOL_GPL(svc_xprt_enqueue); /* - * Dequeue the first transport. Must be called with the pool->sp_lock held. + * Dequeue the first transport, if there is one. */ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) { - struct svc_xprt *xprt; + struct svc_xprt *xprt = NULL; if (list_empty(&pool->sp_sockets)) - return NULL; - - xprt = list_entry(pool->sp_sockets.next, - struct svc_xprt, xpt_ready); - list_del_init(&xprt->xpt_ready); + goto out; - dprintk("svc: transport %p dequeued, inuse=%d\n", - xprt, atomic_read(&xprt->xpt_ref.refcount)); + spin_lock_bh(&pool->sp_lock); + if (likely(!list_empty(&pool->sp_sockets))) { + xprt = list_first_entry(&pool->sp_sockets, + struct svc_xprt, xpt_ready); + list_del_init(&xprt->xpt_ready); + svc_xprt_get(xprt); + dprintk("svc: transport %p dequeued, inuse=%d\n", + xprt, atomic_read(&xprt->xpt_ref.refcount)); + } + spin_unlock_bh(&pool->sp_lock); +out: + trace_svc_xprt_dequeue(xprt); return xprt; } @@ -484,34 +500,36 @@ static void svc_xprt_release(struct svc_rqst *rqstp) } /* - * External function to wake up a server waiting for data - * This really only makes sense for services like lockd - * which have exactly one thread anyway. + * Some svc_serv's will have occasional work to do, even when a xprt is not + * waiting to be serviced. This function is there to "kick" a task in one of + * those services so that it can wake up and do that work. Note that we only + * bother with pool 0 as we don't need to wake up more than one thread for + * this purpose. */ void svc_wake_up(struct svc_serv *serv) { struct svc_rqst *rqstp; - unsigned int i; struct svc_pool *pool; - for (i = 0; i < serv->sv_nrpools; i++) { - pool = &serv->sv_pools[i]; + pool = &serv->sv_pools[0]; - spin_lock_bh(&pool->sp_lock); - if (!list_empty(&pool->sp_threads)) { - rqstp = list_entry(pool->sp_threads.next, - struct svc_rqst, - rq_list); - dprintk("svc: daemon %p woken up.\n", rqstp); - /* - svc_thread_dequeue(pool, rqstp); - rqstp->rq_xprt = NULL; - */ - wake_up_process(rqstp->rq_task); - } else - pool->sp_task_pending = 1; - spin_unlock_bh(&pool->sp_lock); + rcu_read_lock(); + list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) { + /* skip any that aren't queued */ + if (test_bit(RQ_BUSY, &rqstp->rq_flags)) + continue; + rcu_read_unlock(); + dprintk("svc: daemon %p woken up.\n", rqstp); + wake_up_process(rqstp->rq_task); + trace_svc_wake_up(rqstp->rq_task->pid); + return; } + rcu_read_unlock(); + + /* No free entries available */ + set_bit(SP_TASK_PENDING, &pool->sp_flags); + smp_wmb(); + trace_svc_wake_up(0); } EXPORT_SYMBOL_GPL(svc_wake_up); @@ -622,75 +640,86 @@ static int svc_alloc_arg(struct svc_rqst *rqstp) return 0; } +static bool +rqst_should_sleep(struct svc_rqst *rqstp) +{ + struct svc_pool *pool = rqstp->rq_pool; + + /* did someone call svc_wake_up? */ + if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags)) + return false; + + /* was a socket queued? */ + if (!list_empty(&pool->sp_sockets)) + return false; + + /* are we shutting down? */ + if (signalled() || kthread_should_stop()) + return false; + + /* are we freezing? */ + if (freezing(current)) + return false; + + return true; +} + static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) { struct svc_xprt *xprt; struct svc_pool *pool = rqstp->rq_pool; long time_left = 0; + /* rq_xprt should be clear on entry */ + WARN_ON_ONCE(rqstp->rq_xprt); + /* Normally we will wait up to 5 seconds for any required * cache information to be provided. */ rqstp->rq_chandle.thread_wait = 5*HZ; - spin_lock_bh(&pool->sp_lock); xprt = svc_xprt_dequeue(pool); if (xprt) { rqstp->rq_xprt = xprt; - svc_xprt_get(xprt); /* As there is a shortage of threads and this request * had to be queued, don't allow the thread to wait so * long for cache updates. */ rqstp->rq_chandle.thread_wait = 1*HZ; - pool->sp_task_pending = 0; - } else { - if (pool->sp_task_pending) { - pool->sp_task_pending = 0; - xprt = ERR_PTR(-EAGAIN); - goto out; - } - /* - * We have to be able to interrupt this wait - * to bring down the daemons ... - */ - set_current_state(TASK_INTERRUPTIBLE); + clear_bit(SP_TASK_PENDING, &pool->sp_flags); + return xprt; + } - /* No data pending. Go to sleep */ - svc_thread_enqueue(pool, rqstp); - spin_unlock_bh(&pool->sp_lock); + /* + * We have to be able to interrupt this wait + * to bring down the daemons ... + */ + set_current_state(TASK_INTERRUPTIBLE); + clear_bit(RQ_BUSY, &rqstp->rq_flags); + smp_mb(); - if (!(signalled() || kthread_should_stop())) { - time_left = schedule_timeout(timeout); - __set_current_state(TASK_RUNNING); + if (likely(rqst_should_sleep(rqstp))) + time_left = schedule_timeout(timeout); + else + __set_current_state(TASK_RUNNING); - try_to_freeze(); + try_to_freeze(); - xprt = rqstp->rq_xprt; - if (xprt != NULL) - return xprt; - } else - __set_current_state(TASK_RUNNING); + spin_lock_bh(&rqstp->rq_lock); + set_bit(RQ_BUSY, &rqstp->rq_flags); + spin_unlock_bh(&rqstp->rq_lock); - spin_lock_bh(&pool->sp_lock); - if (!time_left) - pool->sp_stats.threads_timedout++; + xprt = rqstp->rq_xprt; + if (xprt != NULL) + return xprt; - xprt = rqstp->rq_xprt; - if (!xprt) { - svc_thread_dequeue(pool, rqstp); - spin_unlock_bh(&pool->sp_lock); - dprintk("svc: server %p, no data yet\n", rqstp); - if (signalled() || kthread_should_stop()) - return ERR_PTR(-EINTR); - else - return ERR_PTR(-EAGAIN); - } - } -out: - spin_unlock_bh(&pool->sp_lock); - return xprt; + if (!time_left) + atomic_long_inc(&pool->sp_stats.threads_timedout); + + if (signalled() || kthread_should_stop()) + return ERR_PTR(-EINTR); + return ERR_PTR(-EAGAIN); } static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt) @@ -719,7 +748,7 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) dprintk("svc_recv: found XPT_CLOSE\n"); svc_delete_xprt(xprt); /* Leave XPT_BUSY set on the dead xprt: */ - return 0; + goto out; } if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { struct svc_xprt *newxpt; @@ -750,6 +779,8 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) } /* clear XPT_BUSY: */ svc_xprt_received(xprt); +out: + trace_svc_handle_xprt(xprt, len); return len; } @@ -797,7 +828,10 @@ int svc_recv(struct svc_rqst *rqstp, long timeout) clear_bit(XPT_OLD, &xprt->xpt_flags); - rqstp->rq_secure = xprt->xpt_ops->xpo_secure_port(rqstp); + if (xprt->xpt_ops->xpo_secure_port(rqstp)) + set_bit(RQ_SECURE, &rqstp->rq_flags); + else + clear_bit(RQ_SECURE, &rqstp->rq_flags); rqstp->rq_chandle.defer = svc_defer; rqstp->rq_xid = svc_getu32(&rqstp->rq_arg.head[0]); @@ -895,7 +929,6 @@ static void svc_age_temp_xprts(unsigned long closure) continue; list_del_init(le); set_bit(XPT_CLOSE, &xprt->xpt_flags); - set_bit(XPT_DETACHED, &xprt->xpt_flags); dprintk("queuing xprt %p for closing\n", xprt); /* a thread will dequeue and close it soon */ @@ -935,8 +968,7 @@ static void svc_delete_xprt(struct svc_xprt *xprt) xprt->xpt_ops->xpo_detach(xprt); spin_lock_bh(&serv->sv_lock); - if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) - list_del_init(&xprt->xpt_list); + list_del_init(&xprt->xpt_list); WARN_ON_ONCE(!list_empty(&xprt->xpt_ready)); if (test_bit(XPT_TEMP, &xprt->xpt_flags)) serv->sv_tmpcnt--; @@ -1080,7 +1112,7 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req) struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); struct svc_deferred_req *dr; - if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) + if (rqstp->rq_arg.page_len || !test_bit(RQ_USEDEFERRAL, &rqstp->rq_flags)) return NULL; /* if more than a page, give up FIXME */ if (rqstp->rq_deferred) { dr = rqstp->rq_deferred; @@ -1109,7 +1141,7 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req) } svc_xprt_get(rqstp->rq_xprt); dr->xprt = rqstp->rq_xprt; - rqstp->rq_dropme = true; + set_bit(RQ_DROPME, &rqstp->rq_flags); dr->handle.revisit = svc_revisit; return &dr->handle; @@ -1311,10 +1343,10 @@ static int svc_pool_stats_show(struct seq_file *m, void *p) seq_printf(m, "%u %lu %lu %lu %lu\n", pool->sp_id, - pool->sp_stats.packets, + (unsigned long)atomic_long_read(&pool->sp_stats.packets), pool->sp_stats.sockets_queued, - pool->sp_stats.threads_woken, - pool->sp_stats.threads_timedout); + (unsigned long)atomic_long_read(&pool->sp_stats.threads_woken), + (unsigned long)atomic_long_read(&pool->sp_stats.threads_timedout)); return 0; } |