summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/rds/connection.c19
-rw-r--r--net/rds/ib_send.c2
-rw-r--r--net/rds/rds.h5
-rw-r--r--net/rds/send.c82
-rw-r--r--net/rds/threads.c2
5 files changed, 52 insertions, 58 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 5bb0eec5ada3..89871db77f8f 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -148,9 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;
- spin_lock_init(&conn->c_send_lock);
- atomic_set(&conn->c_send_generation, 1);
- atomic_set(&conn->c_senders, 0);
+ init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);
@@ -275,15 +273,8 @@ void rds_conn_shutdown(struct rds_connection *conn)
}
mutex_unlock(&conn->c_cm_lock);
- /* verify everybody's out of rds_send_xmit() */
- spin_lock_irq(&conn->c_send_lock);
- spin_unlock_irq(&conn->c_send_lock);
-
- while(atomic_read(&conn->c_senders)) {
- schedule_timeout(1);
- spin_lock_irq(&conn->c_send_lock);
- spin_unlock_irq(&conn->c_send_lock);
- }
+ wait_event(conn->c_waitq,
+ !test_bit(RDS_IN_XMIT, &conn->c_flags));
conn->c_trans->conn_shutdown(conn);
rds_conn_reset(conn);
@@ -477,8 +468,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
sizeof(cinfo->transport));
cinfo->flags = 0;
- rds_conn_info_set(cinfo->flags,
- spin_is_locked(&conn->c_send_lock), SENDING);
+ rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+ SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 3f91e794eae9..e88cb4af009b 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -321,7 +321,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
* credits (see rds_ib_send_add_credits below).
*
* The RDS send code is essentially single-threaded; rds_send_xmit
- * grabs c_send_lock to ensure exclusive access to the send ring.
+ * sets RDS_IN_XMIT to ensure exclusive access to the send ring.
* However, the ACK sending code is independent and can race with
* message SENDs.
*
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 270ded76fd53..4510344ce8ca 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -80,6 +80,7 @@ enum {
/* Bits for c_flags */
#define RDS_LL_SEND_FULL 0
#define RDS_RECONNECT_PENDING 1
+#define RDS_IN_XMIT 2
struct rds_connection {
struct hlist_node c_hash_node;
@@ -91,9 +92,6 @@ struct rds_connection {
struct rds_cong_map *c_lcong;
struct rds_cong_map *c_fcong;
- spinlock_t c_send_lock; /* protect send ring */
- atomic_t c_send_generation;
- atomic_t c_senders;
struct rds_message *c_xmit_rm;
unsigned long c_xmit_sg;
unsigned int c_xmit_hdr_off;
@@ -120,6 +118,7 @@ struct rds_connection {
struct delayed_work c_conn_w;
struct work_struct c_down_w;
struct mutex c_cm_lock; /* protect conn state & cm */
+ wait_queue_head_t c_waitq;
struct list_head c_map_item;
unsigned long c_map_queued;
diff --git a/net/rds/send.c b/net/rds/send.c
index b9e41afef323..81471b25373b 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
/*
- * Reset the send state. Caller must hold c_send_lock when calling here.
+ * Reset the send state. Callers must ensure that this doesn't race with
+ * rds_send_xmit().
*/
void rds_send_reset(struct rds_connection *conn)
{
struct rds_message *rm, *tmp;
unsigned long flags;
- spin_lock_irqsave(&conn->c_send_lock, flags);
if (conn->c_xmit_rm) {
rm = conn->c_xmit_rm;
conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
rds_message_unmapped(rm);
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
-
rds_message_put(rm);
- } else {
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
}
conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
spin_unlock_irqrestore(&conn->c_lock, flags);
}
+static int acquire_in_xmit(struct rds_connection *conn)
+{
+ return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+}
+
+static void release_in_xmit(struct rds_connection *conn)
+{
+ clear_bit(RDS_IN_XMIT, &conn->c_flags);
+ smp_mb__after_clear_bit();
+ /*
+ * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+ * hot path and finding waiters is very rare. We don't want to walk
+ * the system-wide hashed waitqueue buckets in the fast path only to
+ * almost never find waiters.
+ */
+ if (waitqueue_active(&conn->c_waitq))
+ wake_up_all(&conn->c_waitq);
+}
+
/*
* We're making the concious trade-off here to only send one message
* down the connection at a time.
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
unsigned int tmp;
struct scatterlist *sg;
int ret = 0;
- int gen = 0;
LIST_HEAD(to_be_dropped);
restart:
- if (!rds_conn_up(conn))
- goto out;
/*
* sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@ restart:
* avoids blocking the caller and trading per-connection data between
* caches per message.
*/
- if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
+ if (!acquire_in_xmit(conn)) {
rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM;
goto out;
}
- atomic_inc(&conn->c_senders);
+
+ /*
+ * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
+ * we do the opposite to avoid races.
+ */
+ if (!rds_conn_up(conn)) {
+ release_in_xmit(conn);
+ ret = 0;
+ goto out;
+ }
if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn);
- gen = atomic_inc_return(&conn->c_send_generation);
-
/*
* spin trying to push headers and data down the connection until
* the connection doesn't make forward progress.
@@ -178,7 +197,7 @@ restart:
if (!rm) {
unsigned int len;
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
if (!list_empty(&conn->c_send_queue)) {
rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@ restart:
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
}
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
if (!rm)
break;
@@ -207,10 +226,10 @@ restart:
*/
if (rm->rdma.op_active &&
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped);
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
continue;
}
@@ -336,19 +355,7 @@ restart:
if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn);
- /*
- * We might be racing with another sender who queued a message but
- * backed off on noticing that we held the c_send_lock. If we check
- * for queued messages after dropping the sem then either we'll
- * see the queued message or the queuer will get the sem. If we
- * notice the queued message then we trigger an immediate retry.
- *
- * We need to be careful only to do this when we stopped processing
- * the send queue because it was empty. It's the only way we
- * stop processing the loop when the transport hasn't taken
- * responsibility for forward progress.
- */
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
+ release_in_xmit(conn);
/* Nuke any messages we decided not to retransmit. */
if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@ restart:
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
}
- atomic_dec(&conn->c_senders);
-
/*
- * Other senders will see we have c_send_lock and exit. We
- * need to recheck the send queue and race again for c_send_lock
- * to make sure messages don't just sit on the send queue, if
- * somebody hasn't already beat us into the loop.
+ * Other senders can queue a message after we last test the send queue
+ * but before we clear RDS_IN_XMIT. In that case they'd back off and
+ * not try and send their newly queued message. We need to check the
+ * send queue after having cleared RDS_IN_XMIT so that their message
+ * doesn't get stuck on the send queue.
*
* If the transport cannot continue (i.e ret != 0), then it must
* call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@ restart:
smp_mb();
if (!list_empty(&conn->c_send_queue)) {
rds_stats_inc(s_send_lock_queue_raced);
- if (gen == atomic_read(&conn->c_send_generation)) {
- goto restart;
- }
+ goto restart;
}
}
out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 7a8ca7a1d983..2bab9bf07b91 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -61,7 +61,7 @@
*
* Transition to state DISCONNECTING/DOWN:
* - Inside the shutdown worker; synchronizes with xmit path
- * through c_send_lock, and with connection management callbacks
+ * through RDS_IN_XMIT, and with connection management callbacks
* via c_cm_lock.
*
* For receive callbacks, we rely on the underlying transport
OpenPOWER on IntegriCloud