aboutsummaryrefslogtreecommitdiff
path: root/net/mptcp
diff options
context:
space:
mode:
Diffstat (limited to 'net/mptcp')
-rw-r--r--net/mptcp/protocol.c364
-rw-r--r--net/mptcp/protocol.h9
-rw-r--r--net/mptcp/subflow.c32
3 files changed, 289 insertions, 116 deletions
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 3c19a8efdcea..a8445407d25a 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -31,6 +31,12 @@ struct mptcp6_sock {
};
#endif
+struct mptcp_skb_cb {
+ u32 offset;
+};
+
+#define MPTCP_SKB_CB(__skb) ((struct mptcp_skb_cb *)&((__skb)->cb[0]))
+
/* If msk has an initial subflow socket, and the MP_CAPABLE handshake has not
* completed yet or has failed, return the subflow socket.
* Otherwise return NULL.
@@ -111,6 +117,141 @@ static struct sock *mptcp_subflow_get(const struct mptcp_sock *msk)
return NULL;
}
+static void __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk,
+ struct sk_buff *skb,
+ unsigned int offset, size_t copy_len)
+{
+ struct sock *sk = (struct sock *)msk;
+
+ __skb_unlink(skb, &ssk->sk_receive_queue);
+ skb_set_owner_r(skb, sk);
+ __skb_queue_tail(&sk->sk_receive_queue, skb);
+
+ msk->ack_seq += copy_len;
+ MPTCP_SKB_CB(skb)->offset = offset;
+}
+
+static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
+ struct sock *ssk,
+ unsigned int *bytes)
+{
+ struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
+ struct sock *sk = (struct sock *)msk;
+ unsigned int moved = 0;
+ bool more_data_avail;
+ struct tcp_sock *tp;
+ bool done = false;
+ int rcvbuf;
+
+ rcvbuf = max(ssk->sk_rcvbuf, sk->sk_rcvbuf);
+ if (rcvbuf > sk->sk_rcvbuf)
+ sk->sk_rcvbuf = rcvbuf;
+
+ tp = tcp_sk(ssk);
+ do {
+ u32 map_remaining, offset;
+ u32 seq = tp->copied_seq;
+ struct sk_buff *skb;
+ bool fin;
+
+ /* try to move as much data as available */
+ map_remaining = subflow->map_data_len -
+ mptcp_subflow_get_map_offset(subflow);
+
+ skb = skb_peek(&ssk->sk_receive_queue);
+ if (!skb)
+ break;
+
+ offset = seq - TCP_SKB_CB(skb)->seq;
+ fin = TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN;
+ if (fin) {
+ done = true;
+ seq++;
+ }
+
+ if (offset < skb->len) {
+ size_t len = skb->len - offset;
+
+ if (tp->urg_data)
+ done = true;
+
+ __mptcp_move_skb(msk, ssk, skb, offset, len);
+ seq += len;
+ moved += len;
+
+ if (WARN_ON_ONCE(map_remaining < len))
+ break;
+ } else {
+ WARN_ON_ONCE(!fin);
+ sk_eat_skb(ssk, skb);
+ done = true;
+ }
+
+ WRITE_ONCE(tp->copied_seq, seq);
+ more_data_avail = mptcp_subflow_data_available(ssk);
+
+ if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf)) {
+ done = true;
+ break;
+ }
+ } while (more_data_avail);
+
+ *bytes = moved;
+
+ return done;
+}
+
+/* In most cases we will be able to lock the mptcp socket. If its already
+ * owned, we need to defer to the work queue to avoid ABBA deadlock.
+ */
+static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
+{
+ struct sock *sk = (struct sock *)msk;
+ unsigned int moved = 0;
+
+ if (READ_ONCE(sk->sk_lock.owned))
+ return false;
+
+ if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock)))
+ return false;
+
+ /* must re-check after taking the lock */
+ if (!READ_ONCE(sk->sk_lock.owned))
+ __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+
+ spin_unlock_bh(&sk->sk_lock.slock);
+
+ return moved > 0;
+}
+
+void mptcp_data_ready(struct sock *sk, struct sock *ssk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ set_bit(MPTCP_DATA_READY, &msk->flags);
+
+ if (atomic_read(&sk->sk_rmem_alloc) < READ_ONCE(sk->sk_rcvbuf) &&
+ move_skbs_to_msk(msk, ssk))
+ goto wake;
+
+ /* don't schedule if mptcp sk is (still) over limit */
+ if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf))
+ goto wake;
+
+ /* mptcp socket is owned, release_cb should retry */
+ if (!test_and_set_bit(TCP_DELACK_TIMER_DEFERRED,
+ &sk->sk_tsq_flags)) {
+ sock_hold(sk);
+
+ /* need to try again, its possible release_cb() has already
+ * been called after the test_and_set_bit() above.
+ */
+ move_skbs_to_msk(msk, ssk);
+ }
+wake:
+ sk->sk_data_ready(sk);
+}
+
static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
{
if (!msk->cached_ext)
@@ -323,61 +464,83 @@ fallback:
return ret;
}
-int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
- unsigned int offset, size_t len)
+static void mptcp_wait_data(struct sock *sk, long *timeo)
{
- struct mptcp_read_arg *arg = desc->arg.data;
- size_t copy_len;
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ add_wait_queue(sk_sleep(sk), &wait);
+ sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
- copy_len = min(desc->count, len);
+ sk_wait_event(sk, timeo,
+ test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait);
- if (likely(arg->msg)) {
+ sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ remove_wait_queue(sk_sleep(sk), &wait);
+}
+
+static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
+ struct msghdr *msg,
+ size_t len)
+{
+ struct sock *sk = (struct sock *)msk;
+ struct sk_buff *skb;
+ int copied = 0;
+
+ while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) {
+ u32 offset = MPTCP_SKB_CB(skb)->offset;
+ u32 data_len = skb->len - offset;
+ u32 count = min_t(size_t, len - copied, data_len);
int err;
- err = skb_copy_datagram_msg(skb, offset, arg->msg, copy_len);
- if (err) {
- pr_debug("error path");
- desc->error = err;
- return err;
+ err = skb_copy_datagram_msg(skb, offset, msg, count);
+ if (unlikely(err < 0)) {
+ if (!copied)
+ return err;
+ break;
+ }
+
+ copied += count;
+
+ if (count < data_len) {
+ MPTCP_SKB_CB(skb)->offset += count;
+ break;
}
- } else {
- pr_debug("Flushing skb payload");
- }
- desc->count -= copy_len;
+ __skb_unlink(skb, &sk->sk_receive_queue);
+ __kfree_skb(skb);
- pr_debug("consumed %zu bytes, %zu left", copy_len, desc->count);
- return copy_len;
+ if (copied >= len)
+ break;
+ }
+
+ return copied;
}
-static void mptcp_wait_data(struct sock *sk, long *timeo)
+static bool __mptcp_move_skbs(struct mptcp_sock *msk)
{
- DEFINE_WAIT_FUNC(wait, woken_wake_function);
- struct mptcp_sock *msk = mptcp_sk(sk);
+ unsigned int moved = 0;
+ bool done;
- add_wait_queue(sk_sleep(sk), &wait);
- sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ do {
+ struct sock *ssk = mptcp_subflow_recv_lookup(msk);
- sk_wait_event(sk, timeo,
- test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait);
+ if (!ssk)
+ break;
- sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
- remove_wait_queue(sk_sleep(sk), &wait);
+ lock_sock(ssk);
+ done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+ release_sock(ssk);
+ } while (!done);
+
+ return moved > 0;
}
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
struct mptcp_sock *msk = mptcp_sk(sk);
- struct mptcp_subflow_context *subflow;
- bool more_data_avail = false;
- struct mptcp_read_arg arg;
- read_descriptor_t desc;
- bool wait_data = false;
struct socket *ssock;
- struct tcp_sock *tp;
- bool done = false;
- struct sock *ssk;
int copied = 0;
int target;
long timeo;
@@ -395,65 +558,26 @@ fallback:
return copied;
}
- arg.msg = msg;
- desc.arg.data = &arg;
- desc.error = 0;
-
timeo = sock_rcvtimeo(sk, nonblock);
len = min_t(size_t, len, INT_MAX);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
- while (!done) {
- u32 map_remaining;
+ while (len > (size_t)copied) {
int bytes_read;
- ssk = mptcp_subflow_recv_lookup(msk);
- pr_debug("msk=%p ssk=%p", msk, ssk);
- if (!ssk)
- goto wait_for_data;
-
- subflow = mptcp_subflow_ctx(ssk);
- tp = tcp_sk(ssk);
-
- lock_sock(ssk);
- do {
- /* try to read as much data as available */
- map_remaining = subflow->map_data_len -
- mptcp_subflow_get_map_offset(subflow);
- desc.count = min_t(size_t, len - copied, map_remaining);
- pr_debug("reading %zu bytes, copied %d", desc.count,
- copied);
- bytes_read = tcp_read_sock(ssk, &desc,
- mptcp_read_actor);
- if (bytes_read < 0) {
- if (!copied)
- copied = bytes_read;
- done = true;
- goto next;
- }
+ bytes_read = __mptcp_recvmsg_mskq(msk, msg, len - copied);
+ if (unlikely(bytes_read < 0)) {
+ if (!copied)
+ copied = bytes_read;
+ goto out_err;
+ }
- pr_debug("msk ack_seq=%llx -> %llx", msk->ack_seq,
- msk->ack_seq + bytes_read);
- msk->ack_seq += bytes_read;
- copied += bytes_read;
- if (copied >= len) {
- done = true;
- goto next;
- }
- if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
- pr_err("Urgent data present, cannot proceed");
- done = true;
- goto next;
- }
-next:
- more_data_avail = mptcp_subflow_data_available(ssk);
- } while (more_data_avail && !done);
- release_sock(ssk);
- continue;
+ copied += bytes_read;
-wait_for_data:
- more_data_avail = false;
+ if (skb_queue_empty(&sk->sk_receive_queue) &&
+ __mptcp_move_skbs(msk))
+ continue;
/* only the master socket status is relevant here. The exit
* conditions mirror closely tcp_recvmsg()
@@ -494,26 +618,25 @@ wait_for_data:
}
pr_debug("block timeout %ld", timeo);
- wait_data = true;
mptcp_wait_data(sk, &timeo);
if (unlikely(__mptcp_tcp_fallback(msk)))
goto fallback;
}
- if (more_data_avail) {
- if (!test_bit(MPTCP_DATA_READY, &msk->flags))
- set_bit(MPTCP_DATA_READY, &msk->flags);
- } else if (!wait_data) {
+ if (skb_queue_empty(&sk->sk_receive_queue)) {
+ /* entire backlog drained, clear DATA_READY. */
clear_bit(MPTCP_DATA_READY, &msk->flags);
- /* .. race-breaker: ssk might get new data after last
- * data_available() returns false.
+ /* .. race-breaker: ssk might have gotten new data
+ * after last __mptcp_move_skbs() returned false.
*/
- ssk = mptcp_subflow_recv_lookup(msk);
- if (unlikely(ssk))
+ if (unlikely(__mptcp_move_skbs(msk)))
set_bit(MPTCP_DATA_READY, &msk->flags);
+ } else if (unlikely(!test_bit(MPTCP_DATA_READY, &msk->flags))) {
+ /* data to read but mptcp_wait_data() cleared DATA_READY */
+ set_bit(MPTCP_DATA_READY, &msk->flags);
}
-
+out_err:
release_sock(sk);
return copied;
}
@@ -548,12 +671,24 @@ static unsigned int mptcp_sync_mss(struct sock *sk, u32 pmtu)
return 0;
}
+static void mptcp_worker(struct work_struct *work)
+{
+ struct mptcp_sock *msk = container_of(work, struct mptcp_sock, work);
+ struct sock *sk = &msk->sk.icsk_inet.sk;
+
+ lock_sock(sk);
+ __mptcp_move_skbs(msk);
+ release_sock(sk);
+ sock_put(sk);
+}
+
static int __mptcp_init_sock(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
INIT_LIST_HEAD(&msk->conn_list);
__set_bit(MPTCP_SEND_SPACE, &msk->flags);
+ INIT_WORK(&msk->work, mptcp_worker);
msk->first = NULL;
inet_csk(sk)->icsk_sync_mss = mptcp_sync_mss;
@@ -569,6 +704,14 @@ static int mptcp_init_sock(struct sock *sk)
return __mptcp_init_sock(sk);
}
+static void mptcp_cancel_work(struct sock *sk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ if (cancel_work_sync(&msk->work))
+ sock_put(sk);
+}
+
static void mptcp_subflow_shutdown(struct sock *ssk, int how)
{
lock_sock(ssk);
@@ -614,6 +757,10 @@ static void mptcp_close(struct sock *sk, long timeout)
__mptcp_close_ssk(sk, ssk, subflow, timeout);
}
+ mptcp_cancel_work(sk);
+
+ __skb_queue_purge(&sk->sk_receive_queue);
+
sk_common_release(sk);
}
@@ -807,6 +954,32 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
return -EOPNOTSUPP;
}
+#define MPTCP_DEFERRED_ALL TCPF_DELACK_TIMER_DEFERRED
+
+/* this is very alike tcp_release_cb() but we must handle differently a
+ * different set of events
+ */
+static void mptcp_release_cb(struct sock *sk)
+{
+ unsigned long flags, nflags;
+
+ do {
+ flags = sk->sk_tsq_flags;
+ if (!(flags & MPTCP_DEFERRED_ALL))
+ return;
+ nflags = flags & ~MPTCP_DEFERRED_ALL;
+ } while (cmpxchg(&sk->sk_tsq_flags, flags, nflags) != flags);
+
+ if (flags & TCPF_DELACK_TIMER_DEFERRED) {
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ struct sock *ssk;
+
+ ssk = mptcp_subflow_recv_lookup(msk);
+ if (!ssk || !schedule_work(&msk->work))
+ __sock_put(sk);
+ }
+}
+
static int mptcp_get_port(struct sock *sk, unsigned short snum)
{
struct mptcp_sock *msk = mptcp_sk(sk);
@@ -882,6 +1055,7 @@ static struct proto mptcp_prot = {
.destroy = mptcp_destroy,
.sendmsg = mptcp_sendmsg,
.recvmsg = mptcp_recvmsg,
+ .release_cb = mptcp_release_cb,
.hash = inet_hash,
.unhash = inet_unhash,
.get_port = mptcp_get_port,
@@ -1180,6 +1354,8 @@ void mptcp_proto_init(void)
panic("Failed to register MPTCP proto.\n");
inet_register_protosw(&mptcp_protosw);
+
+ BUILD_BUG_ON(sizeof(struct mptcp_skb_cb) > sizeof_field(struct sk_buff, cb));
}
#if IS_ENABLED(CONFIG_MPTCP_IPV6)
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 9f8663b30456..6c0b2c8ab674 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -70,6 +70,7 @@ struct mptcp_sock {
u32 token;
unsigned long flags;
bool can_ack;
+ struct work_struct work;
struct list_head conn_list;
struct skb_ext *cached_ext; /* for the next sendmsg */
struct socket *subflow; /* outgoing connect/listener/!mp_capable */
@@ -190,17 +191,11 @@ void mptcp_proto_init(void);
int mptcp_proto_v6_init(void);
#endif
-struct mptcp_read_arg {
- struct msghdr *msg;
-};
-
-int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
- unsigned int offset, size_t len);
-
void mptcp_get_options(const struct sk_buff *skb,
struct tcp_options_received *opt_rx);
void mptcp_finish_connect(struct sock *sk);
+void mptcp_data_ready(struct sock *sk, struct sock *ssk);
int mptcp_token_new_request(struct request_sock *req);
void mptcp_token_destroy_request(u32 token);
diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c
index 65122edf60aa..0de2a44bdaa0 100644
--- a/net/mptcp/subflow.c
+++ b/net/mptcp/subflow.c
@@ -408,6 +408,18 @@ validate_seq:
return MAPPING_OK;
}
+static int subflow_read_actor(read_descriptor_t *desc,
+ struct sk_buff *skb,
+ unsigned int offset, size_t len)
+{
+ size_t copy_len = min(desc->count, len);
+
+ desc->count -= copy_len;
+
+ pr_debug("flushed %zu bytes, %zu left", copy_len, desc->count);
+ return copy_len;
+}
+
static bool subflow_check_data_avail(struct sock *ssk)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
@@ -482,16 +494,12 @@ static bool subflow_check_data_avail(struct sock *ssk)
pr_debug("discarding %zu bytes, current map len=%d", delta,
map_remaining);
if (delta) {
- struct mptcp_read_arg arg = {
- .msg = NULL,
- };
read_descriptor_t desc = {
.count = delta,
- .arg.data = &arg,
};
int ret;
- ret = tcp_read_sock(ssk, &desc, mptcp_read_actor);
+ ret = tcp_read_sock(ssk, &desc, subflow_read_actor);
if (ret < 0) {
ssk->sk_err = -ret;
goto fatal;
@@ -554,11 +562,8 @@ static void subflow_data_ready(struct sock *sk)
return;
}
- if (mptcp_subflow_data_available(sk)) {
- set_bit(MPTCP_DATA_READY, &mptcp_sk(parent)->flags);
-
- parent->sk_data_ready(parent);
- }
+ if (mptcp_subflow_data_available(sk))
+ mptcp_data_ready(parent, sk);
}
static void subflow_write_space(struct sock *sk)
@@ -690,11 +695,8 @@ static void subflow_state_change(struct sock *sk)
* a fin packet carrying a DSS can be unnoticed if we don't trigger
* the data available machinery here.
*/
- if (parent && subflow->mp_capable && mptcp_subflow_data_available(sk)) {
- set_bit(MPTCP_DATA_READY, &mptcp_sk(parent)->flags);
-
- parent->sk_data_ready(parent);
- }
+ if (parent && subflow->mp_capable && mptcp_subflow_data_available(sk))
+ mptcp_data_ready(parent, sk);
if (parent && !(parent->sk_shutdown & RCV_SHUTDOWN) &&
!subflow->rx_eof && subflow_is_done(sk)) {