summaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/connection.c20
-rw-r--r--net/rds/ib_cm.c13
-rw-r--r--net/rds/rds.h8
-rw-r--r--net/rds/recv.c4
-rw-r--r--net/rds/send.c36
-rw-r--r--net/rds/tcp_connect.c1
-rw-r--r--net/rds/tcp_listen.c46
7 files changed, 113 insertions, 15 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 378c3a6acf84..da6da57e5f36 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -126,11 +126,14 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
struct rds_transport *loop_trans;
unsigned long flags;
int ret;
+ struct rds_transport *otrans = trans;
+ if (!is_outgoing && otrans->t_type == RDS_TRANS_TCP)
+ goto new_conn;
rcu_read_lock();
conn = rds_conn_lookup(head, laddr, faddr, trans);
if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
- !is_outgoing) {
+ laddr == faddr && !is_outgoing) {
/* This is a looped back IB connection, and we're
* called by the code handling the incoming connect.
* We need a second connection object into which we
@@ -142,6 +145,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
if (conn)
goto out;
+new_conn:
conn = kmem_cache_zalloc(rds_conn_slab, gfp);
if (!conn) {
conn = ERR_PTR(-ENOMEM);
@@ -193,6 +197,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
}
atomic_set(&conn->c_state, RDS_CONN_DOWN);
+ conn->c_send_gen = 0;
conn->c_reconnect_jiffies = 0;
INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);
@@ -229,13 +234,22 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
/* Creating normal conn */
struct rds_connection *found;
- found = rds_conn_lookup(head, laddr, faddr, trans);
+ if (!is_outgoing && otrans->t_type == RDS_TRANS_TCP)
+ found = NULL;
+ else
+ found = rds_conn_lookup(head, laddr, faddr, trans);
if (found) {
trans->conn_free(conn->c_transport_data);
kmem_cache_free(rds_conn_slab, conn);
conn = found;
} else {
- hlist_add_head_rcu(&conn->c_hash_node, head);
+ if ((is_outgoing && otrans->t_type == RDS_TRANS_TCP) ||
+ (otrans->t_type != RDS_TRANS_TCP)) {
+ /* Only the active side should be added to
+ * reconnect list for TCP.
+ */
+ hlist_add_head_rcu(&conn->c_hash_node, head);
+ }
rds_cong_add_conn(conn);
rds_conn_count++;
}
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 31b74f5e61ad..8a09ee7db3c1 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -183,8 +183,17 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
/* If the peer gave us the last packet it saw, process this as if
* we had received a regular ACK. */
- if (dp && dp->dp_ack_seq)
- rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
+ if (dp) {
+ /* dp structure start is not guaranteed to be 8 bytes aligned.
+ * Since dp_ack_seq is 64-bit extended load operations can be
+ * used so go through get_unaligned to avoid unaligned errors.
+ */
+ __be64 dp_ack_seq = get_unaligned(&dp->dp_ack_seq);
+
+ if (dp_ack_seq)
+ rds_send_drop_acked(conn, be64_to_cpu(dp_ack_seq),
+ NULL);
+ }
rds_connect_complete(conn);
}
diff --git a/net/rds/rds.h b/net/rds/rds.h
index c2a5eef41343..0d41155a2258 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -110,6 +110,7 @@ struct rds_connection {
void *c_transport_data;
atomic_t c_state;
+ unsigned long c_send_gen;
unsigned long c_flags;
unsigned long c_reconnect_jiffies;
struct delayed_work c_send_w;
@@ -702,8 +703,8 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
void rds_inc_put(struct rds_incoming *inc);
void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
struct rds_incoming *inc, gfp_t gfp);
-int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
- size_t size, int msg_flags);
+int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
+ int msg_flags);
void rds_clear_recv_queue(struct rds_sock *rs);
int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msg);
void rds_inc_info_copy(struct rds_incoming *inc,
@@ -711,8 +712,7 @@ void rds_inc_info_copy(struct rds_incoming *inc,
__be32 saddr, __be32 daddr, int flip);
/* send.c */
-int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
- size_t payload_len);
+int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len);
void rds_send_reset(struct rds_connection *conn);
int rds_send_xmit(struct rds_connection *conn);
struct sockaddr_in;
diff --git a/net/rds/recv.c b/net/rds/recv.c
index f9ec1acd801c..a00462b0d01d 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -395,8 +395,8 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg)
return 0;
}
-int rds_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
- size_t size, int msg_flags)
+int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
+ int msg_flags)
{
struct sock *sk = sock->sk;
struct rds_sock *rs = rds_sk_to_rs(sk);
diff --git a/net/rds/send.c b/net/rds/send.c
index 42f65d4305c8..e9430f537f9c 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -140,8 +140,11 @@ int rds_send_xmit(struct rds_connection *conn)
struct scatterlist *sg;
int ret = 0;
LIST_HEAD(to_be_dropped);
+ int batch_count;
+ unsigned long send_gen = 0;
restart:
+ batch_count = 0;
/*
* sendmsg calls here after having queued its message on the send
@@ -157,6 +160,17 @@ restart:
}
/*
+ * we record the send generation after doing the xmit acquire.
+ * if someone else manages to jump in and do some work, we'll use
+ * this to avoid a goto restart farther down.
+ *
+ * The acquire_in_xmit() check above ensures that only one
+ * caller can increment c_send_gen at any time.
+ */
+ conn->c_send_gen++;
+ send_gen = conn->c_send_gen;
+
+ /*
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
* we do the opposite to avoid races.
*/
@@ -202,6 +216,16 @@ restart:
if (!rm) {
unsigned int len;
+ batch_count++;
+
+ /* we want to process as big a batch as we can, but
+ * we also want to avoid softlockups. If we've been
+ * through a lot of messages, lets back off and see
+ * if anyone else jumps in
+ */
+ if (batch_count >= 1024)
+ goto over_batch;
+
spin_lock_irqsave(&conn->c_lock, flags);
if (!list_empty(&conn->c_send_queue)) {
@@ -357,9 +381,9 @@ restart:
}
}
+over_batch:
if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn);
-
release_in_xmit(conn);
/* Nuke any messages we decided not to retransmit. */
@@ -380,10 +404,15 @@ restart:
* If the transport cannot continue (i.e ret != 0), then it must
* call us when more room is available, such as from the tx
* completion handler.
+ *
+ * We have an extra generation check here so that if someone manages
+ * to jump in after our release_in_xmit, we'll see that they have done
+ * some work and we will skip our goto
*/
if (ret == 0) {
smp_mb();
- if (!list_empty(&conn->c_send_queue)) {
+ if (!list_empty(&conn->c_send_queue) &&
+ send_gen == conn->c_send_gen) {
rds_stats_inc(s_send_lock_queue_raced);
goto restart;
}
@@ -920,8 +949,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
return ret;
}
-int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
- size_t payload_len)
+int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
{
struct sock *sk = sock->sk;
struct rds_sock *rs = rds_sk_to_rs(sk);
diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c
index f9f564a6c960..973109c7b8e8 100644
--- a/net/rds/tcp_connect.c
+++ b/net/rds/tcp_connect.c
@@ -62,6 +62,7 @@ void rds_tcp_state_change(struct sock *sk)
case TCP_ESTABLISHED:
rds_connect_complete(conn);
break;
+ case TCP_CLOSE_WAIT:
case TCP_CLOSE:
rds_conn_drop(conn);
default:
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index 23ab4dcd1d9f..0da49e34495f 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -45,12 +45,45 @@ static void rds_tcp_accept_worker(struct work_struct *work);
static DECLARE_WORK(rds_tcp_listen_work, rds_tcp_accept_worker);
static struct socket *rds_tcp_listen_sock;
+static int rds_tcp_keepalive(struct socket *sock)
+{
+ /* values below based on xs_udp_default_timeout */
+ int keepidle = 5; /* send a probe 'keepidle' secs after last data */
+ int keepcnt = 5; /* number of unack'ed probes before declaring dead */
+ int keepalive = 1;
+ int ret = 0;
+
+ ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+ (char *)&keepalive, sizeof(keepalive));
+ if (ret < 0)
+ goto bail;
+
+ ret = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPCNT,
+ (char *)&keepcnt, sizeof(keepcnt));
+ if (ret < 0)
+ goto bail;
+
+ ret = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPIDLE,
+ (char *)&keepidle, sizeof(keepidle));
+ if (ret < 0)
+ goto bail;
+
+ /* KEEPINTVL is the interval between successive probes. We follow
+ * the model in xs_tcp_finish_connecting() and re-use keepidle.
+ */
+ ret = kernel_setsockopt(sock, IPPROTO_TCP, TCP_KEEPINTVL,
+ (char *)&keepidle, sizeof(keepidle));
+bail:
+ return ret;
+}
+
static int rds_tcp_accept_one(struct socket *sock)
{
struct socket *new_sock = NULL;
struct rds_connection *conn;
int ret;
struct inet_sock *inet;
+ struct rds_tcp_connection *rs_tcp;
ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
sock->sk->sk_protocol, &new_sock);
@@ -63,6 +96,10 @@ static int rds_tcp_accept_one(struct socket *sock)
if (ret < 0)
goto out;
+ ret = rds_tcp_keepalive(new_sock);
+ if (ret < 0)
+ goto out;
+
rds_tcp_tune(new_sock);
inet = inet_sk(new_sock->sk);
@@ -77,6 +114,15 @@ static int rds_tcp_accept_one(struct socket *sock)
ret = PTR_ERR(conn);
goto out;
}
+ /* An incoming SYN request came in, and TCP just accepted it.
+ * We always create a new conn for listen side of TCP, and do not
+ * add it to the c_hash_list.
+ *
+ * If the client reboots, this conn will need to be cleaned up.
+ * rds_tcp_state_change() will do that cleanup
+ */
+ rs_tcp = (struct rds_tcp_connection *)conn->c_transport_data;
+ WARN_ON(!rs_tcp || rs_tcp->t_sock);
/*
* see the comment above rds_queue_delayed_reconnect()
OpenPOWER on IntegriCloud