diff options
Diffstat (limited to 'fs/nfsd/nfs4callback.c')
-rw-r--r-- | fs/nfsd/nfs4callback.c | 150 |
1 files changed, 110 insertions, 40 deletions
diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c index 397eb7820929..c3b11a715082 100644 --- a/fs/nfsd/nfs4callback.c +++ b/fs/nfsd/nfs4callback.c @@ -512,11 +512,9 @@ static int nfs4_xdr_dec_cb_recall(struct rpc_rqst *rqstp, if (unlikely(status)) return status; - if (cb != NULL) { - status = decode_cb_sequence4res(xdr, cb); - if (unlikely(status || cb->cb_seq_status)) - return status; - } + status = decode_cb_sequence4res(xdr, cb); + if (unlikely(status || cb->cb_seq_status)) + return status; return decode_cb_op_status(xdr, OP_CB_RECALL, &cb->cb_status); } @@ -604,11 +602,10 @@ static int nfs4_xdr_dec_cb_layout(struct rpc_rqst *rqstp, if (unlikely(status)) return status; - if (cb) { - status = decode_cb_sequence4res(xdr, cb); - if (unlikely(status || cb->cb_seq_status)) - return status; - } + status = decode_cb_sequence4res(xdr, cb); + if (unlikely(status || cb->cb_seq_status)) + return status; + return decode_cb_op_status(xdr, OP_CB_LAYOUTRECALL, &cb->cb_status); } #endif /* CONFIG_NFSD_PNFS */ @@ -663,11 +660,10 @@ static int nfs4_xdr_dec_cb_notify_lock(struct rpc_rqst *rqstp, if (unlikely(status)) return status; - if (cb) { - status = decode_cb_sequence4res(xdr, cb); - if (unlikely(status || cb->cb_seq_status)) - return status; - } + status = decode_cb_sequence4res(xdr, cb); + if (unlikely(status || cb->cb_seq_status)) + return status; + return decode_cb_op_status(xdr, OP_CB_NOTIFY_LOCK, &cb->cb_status); } @@ -759,11 +755,10 @@ static int nfs4_xdr_dec_cb_offload(struct rpc_rqst *rqstp, if (unlikely(status)) return status; - if (cb) { - status = decode_cb_sequence4res(xdr, cb); - if (unlikely(status || cb->cb_seq_status)) - return status; - } + status = decode_cb_sequence4res(xdr, cb); + if (unlikely(status || cb->cb_seq_status)) + return status; + return decode_cb_op_status(xdr, OP_CB_OFFLOAD, &cb->cb_status); } /* @@ -828,7 +823,41 @@ static const struct rpc_program cb_program = { static int max_cb_time(struct net *net) { struct nfsd_net *nn = net_generic(net, nfsd_net_id); - return max(nn->nfsd4_lease/10, (time_t)1) * HZ; + + /* + * nfsd4_lease is set to at most one hour in __nfsd4_write_time, + * so we can use 32-bit math on it. Warn if that assumption + * ever stops being true. + */ + if (WARN_ON_ONCE(nn->nfsd4_lease > 3600)) + return 360 * HZ; + + return max(((u32)nn->nfsd4_lease)/10, 1u) * HZ; +} + +static struct workqueue_struct *callback_wq; + +static bool nfsd4_queue_cb(struct nfsd4_callback *cb) +{ + return queue_work(callback_wq, &cb->cb_work); +} + +static void nfsd41_cb_inflight_begin(struct nfs4_client *clp) +{ + atomic_inc(&clp->cl_cb_inflight); +} + +static void nfsd41_cb_inflight_end(struct nfs4_client *clp) +{ + + if (atomic_dec_and_test(&clp->cl_cb_inflight)) + wake_up_var(&clp->cl_cb_inflight); +} + +static void nfsd41_cb_inflight_wait_complete(struct nfs4_client *clp) +{ + wait_var_event(&clp->cl_cb_inflight, + !atomic_read(&clp->cl_cb_inflight)); } static const struct cred *get_backchannel_cred(struct nfs4_client *clp, struct rpc_clnt *client, struct nfsd4_session *ses) @@ -942,14 +971,21 @@ static void nfsd4_cb_probe_done(struct rpc_task *task, void *calldata) clp->cl_cb_state = NFSD4_CB_UP; } +static void nfsd4_cb_probe_release(void *calldata) +{ + struct nfs4_client *clp = container_of(calldata, struct nfs4_client, cl_cb_null); + + nfsd41_cb_inflight_end(clp); + +} + static const struct rpc_call_ops nfsd4_cb_probe_ops = { /* XXX: release method to ensure we set the cb channel down if * necessary on early failure? */ .rpc_call_done = nfsd4_cb_probe_done, + .rpc_release = nfsd4_cb_probe_release, }; -static struct workqueue_struct *callback_wq; - /* * Poke the callback thread to process any updates to the callback * parameters, and send a null probe. @@ -980,9 +1016,12 @@ void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn) * If the slot is available, then mark it busy. Otherwise, set the * thread for sleeping on the callback RPC wait queue. */ -static bool nfsd41_cb_get_slot(struct nfs4_client *clp, struct rpc_task *task) +static bool nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task) { - if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) { + struct nfs4_client *clp = cb->cb_clp; + + if (!cb->cb_holds_slot && + test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) { rpc_sleep_on(&clp->cl_cb_waitq, task, NULL); /* Race breaker */ if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) { @@ -991,9 +1030,31 @@ static bool nfsd41_cb_get_slot(struct nfs4_client *clp, struct rpc_task *task) } rpc_wake_up_queued_task(&clp->cl_cb_waitq, task); } + cb->cb_holds_slot = true; return true; } +static void nfsd41_cb_release_slot(struct nfsd4_callback *cb) +{ + struct nfs4_client *clp = cb->cb_clp; + + if (cb->cb_holds_slot) { + cb->cb_holds_slot = false; + clear_bit(0, &clp->cl_cb_slot_busy); + rpc_wake_up_next(&clp->cl_cb_waitq); + } +} + +static void nfsd41_destroy_cb(struct nfsd4_callback *cb) +{ + struct nfs4_client *clp = cb->cb_clp; + + nfsd41_cb_release_slot(cb); + if (cb->cb_ops && cb->cb_ops->release) + cb->cb_ops->release(cb); + nfsd41_cb_inflight_end(clp); +} + /* * TODO: cb_sequence should support referring call lists, cachethis, multiple * slots, and mark callback channel down on communication errors. @@ -1010,11 +1071,8 @@ static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata) */ cb->cb_seq_status = 1; cb->cb_status = 0; - if (minorversion) { - if (!cb->cb_holds_slot && !nfsd41_cb_get_slot(clp, task)) - return; - cb->cb_holds_slot = true; - } + if (minorversion && !nfsd41_cb_get_slot(cb, task)) + return; rpc_call_start(task); } @@ -1077,13 +1135,12 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback } break; default: + nfsd4_mark_cb_fault(cb->cb_clp, cb->cb_seq_status); dprintk("%s: unprocessed error %d\n", __func__, cb->cb_seq_status); } - cb->cb_holds_slot = false; - clear_bit(0, &clp->cl_cb_slot_busy); - rpc_wake_up_next(&clp->cl_cb_waitq); + nfsd41_cb_release_slot(cb); dprintk("%s: freed slot, new seqid=%d\n", __func__, clp->cl_cb_session->se_cb_seq_nr); @@ -1096,8 +1153,10 @@ retry_nowait: ret = false; goto out; need_restart: - task->tk_status = 0; - cb->cb_need_restart = true; + if (!test_bit(NFSD4_CLIENT_CB_KILL, &clp->cl_flags)) { + task->tk_status = 0; + cb->cb_need_restart = true; + } return false; } @@ -1139,9 +1198,9 @@ static void nfsd4_cb_release(void *calldata) struct nfsd4_callback *cb = calldata; if (cb->cb_need_restart) - nfsd4_run_cb(cb); + nfsd4_queue_cb(cb); else - cb->cb_ops->release(cb); + nfsd41_destroy_cb(cb); } @@ -1175,6 +1234,7 @@ void nfsd4_shutdown_callback(struct nfs4_client *clp) */ nfsd4_run_cb(&clp->cl_cb_null); flush_workqueue(callback_wq); + nfsd41_cb_inflight_wait_complete(clp); } /* requires cl_lock: */ @@ -1192,6 +1252,12 @@ static struct nfsd4_conn * __nfsd4_find_backchannel(struct nfs4_client *clp) return NULL; } +/* + * Note there isn't a lot of locking in this code; instead we depend on + * the fact that it is run from the callback_wq, which won't run two + * work items at once. So, for example, callback_wq handles all access + * of cl_cb_client and all calls to rpc_create or rpc_shutdown_client. + */ static void nfsd4_process_cb_update(struct nfsd4_callback *cb) { struct nfs4_cb_conn conn; @@ -1260,8 +1326,7 @@ nfsd4_run_cb_work(struct work_struct *work) clnt = clp->cl_cb_client; if (!clnt) { /* Callback channel broken, or client killed; give up: */ - if (cb->cb_ops && cb->cb_ops->release) - cb->cb_ops->release(cb); + nfsd41_destroy_cb(cb); return; } @@ -1270,6 +1335,7 @@ nfsd4_run_cb_work(struct work_struct *work) */ if (!cb->cb_ops && clp->cl_minorversion) { clp->cl_cb_state = NFSD4_CB_UP; + nfsd41_destroy_cb(cb); return; } @@ -1295,5 +1361,9 @@ void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp, void nfsd4_run_cb(struct nfsd4_callback *cb) { - queue_work(callback_wq, &cb->cb_work); + struct nfs4_client *clp = cb->cb_clp; + + nfsd41_cb_inflight_begin(clp); + if (!nfsd4_queue_cb(cb)) + nfsd41_cb_inflight_end(clp); } |