aboutsummaryrefslogtreecommitdiff
path: root/net/rds/send.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/send.c')
-rw-r--r--net/rds/send.c73
1 files changed, 58 insertions, 15 deletions
diff --git a/net/rds/send.c b/net/rds/send.c
index 77c8c6e613ad..41b9f0f5bb9c 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -170,8 +170,8 @@ restart:
* The acquire_in_xmit() check above ensures that only one
* caller can increment c_send_gen at any time.
*/
- cp->cp_send_gen++;
- send_gen = cp->cp_send_gen;
+ send_gen = READ_ONCE(cp->cp_send_gen) + 1;
+ WRITE_ONCE(cp->cp_send_gen, send_gen);
/*
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
@@ -431,7 +431,7 @@ over_batch:
smp_mb();
if ((test_bit(0, &conn->c_map_queued) ||
!list_empty(&cp->cp_send_queue)) &&
- send_gen == cp->cp_send_gen) {
+ send_gen == READ_ONCE(cp->cp_send_gen)) {
rds_stats_inc(s_send_lock_queue_raced);
if (batch_count < send_batch_count)
goto restart;
@@ -476,12 +476,14 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
struct rm_rdma_op *ro;
struct rds_notifier *notifier;
unsigned long flags;
+ unsigned int notify = 0;
spin_lock_irqsave(&rm->m_rs_lock, flags);
+ notify = rm->rdma.op_notify | rm->data.op_notify;
ro = &rm->rdma;
if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
- ro->op_active && ro->op_notify && ro->op_notifier) {
+ ro->op_active && notify && ro->op_notifier) {
notifier = ro->op_notifier;
rs = rm->m_rs;
sock_hold(rds_rs_to_sk(rs));
@@ -945,6 +947,11 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
ret = rds_cmsg_rdma_map(rs, rm, cmsg);
if (!ret)
*allocated_mr = 1;
+ else if (ret == -ENODEV)
+ /* Accommodate the get_mr() case which can fail
+ * if connection isn't established yet.
+ */
+ ret = -EAGAIN;
break;
case RDS_CMSG_ATOMIC_CSWP:
case RDS_CMSG_ATOMIC_FADD:
@@ -964,8 +971,6 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
return ret;
}
-static void rds_send_ping(struct rds_connection *conn);
-
static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
{
int hash;
@@ -975,7 +980,7 @@ static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
else
hash = RDS_MPATH_HASH(rs, conn->c_npaths);
if (conn->c_npaths == 0 && hash != 0) {
- rds_send_ping(conn);
+ rds_send_ping(conn, 0);
if (conn->c_npaths == 0) {
wait_event_interruptible(conn->c_hs_waitq,
@@ -987,6 +992,26 @@ static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
return hash;
}
+static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
+{
+ struct rds_rdma_args *args;
+ struct cmsghdr *cmsg;
+
+ for_each_cmsghdr(cmsg, msg) {
+ if (!CMSG_OK(msg, cmsg))
+ return -EINVAL;
+
+ if (cmsg->cmsg_level != SOL_RDS)
+ continue;
+
+ if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
+ args = CMSG_DATA(cmsg);
+ *rdma_bytes += args->remote_vec.bytes;
+ }
+ }
+ return 0;
+}
+
int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
{
struct sock *sk = sock->sk;
@@ -1001,6 +1026,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
int nonblock = msg->msg_flags & MSG_DONTWAIT;
long timeo = sock_sndtimeo(sk, nonblock);
struct rds_conn_path *cpath;
+ size_t total_payload_len = payload_len, rdma_payload_len = 0;
/* Mirror Linux UDP mirror of BSD error message compatibility */
/* XXX: Perhaps MSG_MORE someday */
@@ -1033,6 +1059,16 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
}
release_sock(sk);
+ ret = rds_rdma_bytes(msg, &rdma_payload_len);
+ if (ret)
+ goto out;
+
+ total_payload_len += rdma_payload_len;
+ if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
+ ret = -EMSGSIZE;
+ goto out;
+ }
+
if (payload_len > rds_sk_sndbuf(rs)) {
ret = -EMSGSIZE;
goto out;
@@ -1082,8 +1118,12 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
/* Parse any control messages the user may have included. */
ret = rds_cmsg_send(rs, rm, msg, &allocated_mr);
- if (ret)
+ if (ret) {
+ /* Trigger connection so that its ready for the next retry */
+ if (ret == -EAGAIN)
+ rds_conn_connect_if_down(conn);
goto out;
+ }
if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
@@ -1169,7 +1209,7 @@ out:
* or
* RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
*/
-int
+static int
rds_send_probe(struct rds_conn_path *cp, __be16 sport,
__be16 dport, u8 h_flags)
{
@@ -1204,15 +1244,17 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
rm->m_inc.i_hdr.h_flags |= h_flags;
cp->cp_next_tx_seq++;
- if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) {
- u16 npaths = RDS_MPATH_WORKERS;
+ if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
+ cp->cp_conn->c_trans->t_mp_capable) {
+ u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
+ u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
rds_message_add_extension(&rm->m_inc.i_hdr,
RDS_EXTHDR_NPATHS, &npaths,
sizeof(npaths));
rds_message_add_extension(&rm->m_inc.i_hdr,
RDS_EXTHDR_GEN_NUM,
- &cp->cp_conn->c_my_gen_num,
+ &my_gen_num,
sizeof(u32));
}
spin_unlock_irqrestore(&cp->cp_lock, flags);
@@ -1239,10 +1281,10 @@ rds_send_pong(struct rds_conn_path *cp, __be16 dport)
}
void
-rds_send_ping(struct rds_connection *conn)
+rds_send_ping(struct rds_connection *conn, int cp_index)
{
unsigned long flags;
- struct rds_conn_path *cp = &conn->c_path[0];
+ struct rds_conn_path *cp = &conn->c_path[cp_index];
spin_lock_irqsave(&cp->cp_lock, flags);
if (conn->c_ping_triggered) {
@@ -1251,5 +1293,6 @@ rds_send_ping(struct rds_connection *conn)
}
conn->c_ping_triggered = 1;
spin_unlock_irqrestore(&cp->cp_lock, flags);
- rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0);
+ rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
}
+EXPORT_SYMBOL_GPL(rds_send_ping);