diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2017-02-22 10:15:09 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2017-02-22 10:15:09 -0800 |
commit | 3051bf36c25d5153051704291782f8d44e744d36 (patch) | |
tree | 72dfc8a1d12675c6f2981d13102df954b678f11b /net/tipc/socket.c | |
parent | 1e74a2eb1f5cc7f2f2b5aa9c9eeecbcf352220a3 (diff) | |
parent | 005c3490e9db23738d91e02788606c0fe4734723 (diff) |
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next
Pull networking updates from David Miller:
"Highlights:
1) Support TX_RING in AF_PACKET TPACKET_V3 mode, from Sowmini
Varadhan.
2) Simplify classifier state on sk_buff in order to shrink it a bit.
From Willem de Bruijn.
3) Introduce SIPHASH and it's usage for secure sequence numbers and
syncookies. From Jason A. Donenfeld.
4) Reduce CPU usage for ICMP replies we are going to limit or
suppress, from Jesper Dangaard Brouer.
5) Introduce Shared Memory Communications socket layer, from Ursula
Braun.
6) Add RACK loss detection and allow it to actually trigger fast
recovery instead of just assisting after other algorithms have
triggered it. From Yuchung Cheng.
7) Add xmit_more and BQL support to mvneta driver, from Simon Guinot.
8) skb_cow_data avoidance in esp4 and esp6, from Steffen Klassert.
9) Export MPLS packet stats via netlink, from Robert Shearman.
10) Significantly improve inet port bind conflict handling, especially
when an application is restarted and changes it's setting of
reuseport. From Josef Bacik.
11) Implement TX batching in vhost_net, from Jason Wang.
12) Extend the dummy device so that VF (virtual function) features,
such as configuration, can be more easily tested. From Phil
Sutter.
13) Avoid two atomic ops per page on x86 in bnx2x driver, from Eric
Dumazet.
14) Add new bpf MAP, implementing a longest prefix match trie. From
Daniel Mack.
15) Packet sample offloading support in mlxsw driver, from Yotam Gigi.
16) Add new aquantia driver, from David VomLehn.
17) Add bpf tracepoints, from Daniel Borkmann.
18) Add support for port mirroring to b53 and bcm_sf2 drivers, from
Florian Fainelli.
19) Remove custom busy polling in many drivers, it is done in the core
networking since 4.5 times. From Eric Dumazet.
20) Support XDP adjust_head in virtio_net, from John Fastabend.
21) Fix several major holes in neighbour entry confirmation, from
Julian Anastasov.
22) Add XDP support to bnxt_en driver, from Michael Chan.
23) VXLAN offloads for enic driver, from Govindarajulu Varadarajan.
24) Add IPVTAP driver (IP-VLAN based tap driver) from Sainath Grandhi.
25) Support GRO in IPSEC protocols, from Steffen Klassert"
* git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next: (1764 commits)
Revert "ath10k: Search SMBIOS for OEM board file extension"
net: socket: fix recvmmsg not returning error from sock_error
bnxt_en: use eth_hw_addr_random()
bpf: fix unlocking of jited image when module ronx not set
arch: add ARCH_HAS_SET_MEMORY config
net: napi_watchdog() can use napi_schedule_irqoff()
tcp: Revert "tcp: tcp_probe: use spin_lock_bh()"
net/hsr: use eth_hw_addr_random()
net: mvpp2: enable building on 64-bit platforms
net: mvpp2: switch to build_skb() in the RX path
net: mvpp2: simplify MVPP2_PRS_RI_* definitions
net: mvpp2: fix indentation of MVPP2_EXT_GLOBAL_CTRL_DEFAULT
net: mvpp2: remove unused register definitions
net: mvpp2: simplify mvpp2_bm_bufs_add()
net: mvpp2: drop useless fields in mvpp2_bm_pool and related code
net: mvpp2: remove unused 'tx_skb' field of 'struct mvpp2_tx_queue'
net: mvpp2: release reference to txq_cpu[] entry after unmapping
net: mvpp2: handle too large value in mvpp2_rx_time_coal_set()
net: mvpp2: handle too large value handling in mvpp2_rx_pkts_coal_set()
net: mvpp2: remove useless arguments in mvpp2_rx_{pkts, time}_coal_set
...
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r-- | net/tipc/socket.c | 525 |
1 files changed, 258 insertions, 267 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 800caaa699a1..6b09a778cc71 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -67,16 +67,19 @@ enum { * @max_pkt: maximum packet size "hint" used when building messages sent by port * @portid: unique port identity in TIPC socket hash table * @phdr: preformatted message header used when sending messages + * #cong_links: list of congested links * @publications: list of publications for port + * @blocking_link: address of the congested link we are currently sleeping on * @pub_count: total # of publications port has made during its lifetime * @probing_state: * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue - * @link_cong: non-zero if owner must sleep because of link congestion + * @cong_link_cnt: number of congested links * @sent_unacked: # messages sent by socket, and not yet acked by peer * @rcv_unacked: # messages read by user, but not yet acked back to peer * @peer: 'connected' peer for dgram/rdm * @node: hash table node + * @mc_method: cookie for use between socket and broadcast layer * @rcu: rcu struct for tipc_sock */ struct tipc_sock { @@ -87,13 +90,13 @@ struct tipc_sock { u32 max_pkt; u32 portid; struct tipc_msg phdr; - struct list_head sock_list; + struct list_head cong_links; struct list_head publications; u32 pub_count; uint conn_timeout; atomic_t dupl_rcvcnt; bool probe_unacked; - bool link_cong; + u16 cong_link_cnt; u16 snt_unacked; u16 snd_win; u16 peer_caps; @@ -101,6 +104,7 @@ struct tipc_sock { u16 rcv_win; struct sockaddr_tipc peer; struct rhash_head node; + struct tipc_mc_method mc_method; struct rcu_head rcu; }; @@ -110,7 +114,6 @@ static void tipc_write_space(struct sock *sk); static void tipc_sock_destruct(struct sock *sk); static int tipc_release(struct socket *sock); static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); static void tipc_sk_timeout(unsigned long data); static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); @@ -119,8 +122,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, - size_t dsz); +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz); static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); static const struct proto_ops packet_ops; @@ -334,6 +336,49 @@ static int tipc_set_sk_state(struct sock *sk, int state) return res; } +static int tipc_sk_sock_err(struct socket *sock, long *timeout) +{ + struct sock *sk = sock->sk; + int err = sock_error(sk); + int typ = sock->type; + + if (err) + return err; + if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) { + if (sk->sk_state == TIPC_DISCONNECTING) + return -EPIPE; + else if (!tipc_sk_connected(sk)) + return -ENOTCONN; + } + if (!*timeout) + return -EAGAIN; + if (signal_pending(current)) + return sock_intr_errno(*timeout); + + return 0; +} + +#define tipc_wait_for_cond(sock_, timeout_, condition_) \ +({ \ + int rc_ = 0; \ + int done_ = 0; \ + \ + while (!(condition_) && !done_) { \ + struct sock *sk_ = sock->sk; \ + DEFINE_WAIT_FUNC(wait_, woken_wake_function); \ + \ + rc_ = tipc_sk_sock_err(sock_, timeout_); \ + if (rc_) \ + break; \ + prepare_to_wait(sk_sleep(sk_), &wait_, \ + TASK_INTERRUPTIBLE); \ + done_ = sk_wait_event(sk_, timeout_, \ + (condition_), &wait_); \ + remove_wait_queue(sk_sleep(sk_), &wait_); \ + } \ + rc_; \ +}) + /** * tipc_sk_create - create a TIPC socket * @net: network namespace (must be default network) @@ -382,10 +427,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk = tipc_sk(sk); tsk->max_pkt = MAX_PKT_DEFAULT; INIT_LIST_HEAD(&tsk->publications); + INIT_LIST_HEAD(&tsk->cong_links); msg = &tsk->phdr; tn = net_generic(sock_net(sk), tipc_net_id); - tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, - NAMED_H_SIZE, 0); /* Finish initializing socket data structures */ sock->ops = ops; @@ -395,6 +439,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock, pr_warn("Socket create failed; port number exhausted\n"); return -EINVAL; } + + /* Ensure tsk is visible before we read own_addr. */ + smp_mb(); + + tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, + NAMED_H_SIZE, 0); + msg_set_origport(msg, tsk->portid); setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); sk->sk_shutdown = 0; @@ -432,9 +483,14 @@ static void __tipc_shutdown(struct socket *sock, int error) struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); + long timeout = CONN_TIMEOUT_DEFAULT; u32 dnode = tsk_peer_node(tsk); struct sk_buff *skb; + /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */ + tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt && + !tsk_conn_cong(tsk))); + /* Reject all unreceived messages, except on an active connection * (which disconnects locally & sends a 'FIN+' to peer). */ @@ -505,7 +561,8 @@ static int tipc_release(struct socket *sock) /* Reject any messages that accumulated in backlog queue */ release_sock(sk); - + u32_list_purge(&tsk->cong_links); + tsk->cong_link_cnt = 0; call_rcu(&tsk->rcu, tipc_sk_callback); sock->sk = NULL; @@ -648,7 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, switch (sk->sk_state) { case TIPC_ESTABLISHED: - if (!tsk->link_cong && !tsk_conn_cong(tsk)) + if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) mask |= POLLOUT; /* fall thru' */ case TIPC_LISTEN: @@ -657,7 +714,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, mask |= (POLLIN | POLLRDNORM); break; case TIPC_OPEN: - if (!tsk->link_cong) + if (!tsk->cong_link_cnt) mask |= POLLOUT; if (tipc_sk_type_connectionless(sk) && (!skb_queue_empty(&sk->sk_receive_queue))) @@ -676,63 +733,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, * @sock: socket structure * @seq: destination address * @msg: message to send - * @dsz: total length of message data - * @timeo: timeout to wait for wakeup + * @dlen: length of data to send + * @timeout: timeout to wait for wakeup * * Called from function tipc_sendmsg(), which has done all sanity checks * Returns the number of bytes sent on success, or errno */ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, - struct msghdr *msg, size_t dsz, long timeo) + struct msghdr *msg, size_t dlen, long timeout) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = &tsk->phdr; struct net *net = sock_net(sk); - struct tipc_msg *mhdr = &tsk->phdr; - struct sk_buff_head pktchain; - struct iov_iter save = msg->msg_iter; - uint mtu; + int mtu = tipc_bcast_get_mtu(net); + struct tipc_mc_method *method = &tsk->mc_method; + u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE); + struct sk_buff_head pkts; + struct tipc_nlist dsts; int rc; - if (!timeo && tsk->link_cong) - return -ELINKCONG; + /* Block or return if any destination link is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; - msg_set_type(mhdr, TIPC_MCAST_MSG); - msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); - msg_set_destport(mhdr, 0); - msg_set_destnode(mhdr, 0); - msg_set_nametype(mhdr, seq->type); - msg_set_namelower(mhdr, seq->lower); - msg_set_nameupper(mhdr, seq->upper); - msg_set_hdr_sz(mhdr, MCAST_H_SIZE); + /* Lookup destination nodes */ + tipc_nlist_init(&dsts, tipc_own_addr(net)); + tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower, + seq->upper, domain, &dsts); + if (!dsts.local && !dsts.remote) + return -EHOSTUNREACH; - skb_queue_head_init(&pktchain); + /* Build message header */ + msg_set_type(hdr, TIPC_MCAST_MSG); + msg_set_hdr_sz(hdr, MCAST_H_SIZE); + msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); + msg_set_destport(hdr, 0); + msg_set_destnode(hdr, 0); + msg_set_nametype(hdr, seq->type); + msg_set_namelower(hdr, seq->lower); + msg_set_nameupper(hdr, seq->upper); -new_mtu: - mtu = tipc_bcast_get_mtu(net); - rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); - if (unlikely(rc < 0)) - return rc; + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); - do { - rc = tipc_bcast_xmit(net, &pktchain); - if (likely(!rc)) - return dsz; - - if (rc == -ELINKCONG) { - tsk->link_cong = 1; - rc = tipc_wait_for_sndmsg(sock, &timeo); - if (!rc) - continue; - } - __skb_queue_purge(&pktchain); - if (rc == -EMSGSIZE) { - msg->msg_iter = save; - goto new_mtu; - } - break; - } while (1); - return rc; + /* Send message if build was successful */ + if (unlikely(rc == dlen)) + rc = tipc_mcast_xmit(net, &pkts, method, &dsts, + &tsk->cong_link_cnt); + + tipc_nlist_purge(&dsts); + + return rc ? rc : dlen; } /** @@ -746,7 +800,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff_head *inputq) { struct tipc_msg *msg; - struct tipc_plist dports; + struct list_head dports; u32 portid; u32 scope = TIPC_CLUSTER_SCOPE; struct sk_buff_head tmpq; @@ -754,7 +808,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff *skb, *_skb; __skb_queue_head_init(&tmpq); - tipc_plist_init(&dports); + INIT_LIST_HEAD(&dports); skb = tipc_skb_peek(arrvq, &inputq->lock); for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { @@ -768,8 +822,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), msg_nameupper(msg), scope, &dports); - portid = tipc_plist_pop(&dports); - for (; portid; portid = tipc_plist_pop(&dports)) { + portid = u32_pop(&dports); + for (; portid; portid = u32_pop(&dports)) { _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); if (_skb) { msg_set_destport(buf_msg(_skb), portid); @@ -830,31 +884,6 @@ exit: kfree_skb(skb); } -static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) -{ - DEFINE_WAIT_FUNC(wait, woken_wake_function); - struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - int done; - - do { - int err = sock_error(sk); - if (err) - return err; - if (sk->sk_shutdown & SEND_SHUTDOWN) - return -EPIPE; - if (!*timeo_p) - return -EAGAIN; - if (signal_pending(current)) - return sock_intr_errno(*timeo_p); - - add_wait_queue(sk_sleep(sk), &wait); - done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait); - remove_wait_queue(sk_sleep(sk), &wait); - } while (!done); - return 0; -} - /** * tipc_sendmsg - send message in connectionless manner * @sock: socket structure @@ -881,35 +910,38 @@ static int tipc_sendmsg(struct socket *sock, return ret; } -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) { - DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); - struct tipc_msg *mhdr = &tsk->phdr; - u32 dnode, dport; - struct sk_buff_head pktchain; - bool is_connectionless = tipc_sk_type_connectionless(sk); - struct sk_buff *skb; + struct tipc_sock *tsk = tipc_sk(sk); + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + struct list_head *clinks = &tsk->cong_links; + bool syn = !tipc_sk_type_connectionless(sk); + struct tipc_msg *hdr = &tsk->phdr; struct tipc_name_seq *seq; - struct iov_iter save; - u32 mtu; - long timeo; - int rc; + struct sk_buff_head pkts; + u32 type, inst, domain; + u32 dnode, dport; + int mtu, rc; - if (dsz > TIPC_MAX_USER_MSG_SIZE) + if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; + if (unlikely(!dest)) { - if (is_connectionless && tsk->peer.family == AF_TIPC) - dest = &tsk->peer; - else + dest = &tsk->peer; + if (!syn || dest->family != AF_TIPC) return -EDESTADDRREQ; - } else if (unlikely(m->msg_namelen < sizeof(*dest)) || - dest->family != AF_TIPC) { - return -EINVAL; } - if (!is_connectionless) { + + if (unlikely(m->msg_namelen < sizeof(*dest))) + return -EINVAL; + + if (unlikely(dest->family != AF_TIPC)) + return -EINVAL; + + if (unlikely(syn)) { if (sk->sk_state == TIPC_LISTEN) return -EPIPE; if (sk->sk_state != TIPC_OPEN) @@ -921,102 +953,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) tsk->conn_instance = dest->addr.name.name.instance; } } - seq = &dest->addr.nameseq; - timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - if (dest->addrtype == TIPC_ADDR_MCAST) { - return tipc_sendmcast(sock, seq, m, dsz, timeo); - } else if (dest->addrtype == TIPC_ADDR_NAME) { - u32 type = dest->addr.name.name.type; - u32 inst = dest->addr.name.name.instance; - u32 domain = dest->addr.name.domain; + seq = &dest->addr.nameseq; + if (dest->addrtype == TIPC_ADDR_MCAST) + return tipc_sendmcast(sock, seq, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_NAME) { + type = dest->addr.name.name.type; + inst = dest->addr.name.name.instance; + domain = dest->addr.name.domain; dnode = domain; - msg_set_type(mhdr, TIPC_NAMED_MSG); - msg_set_hdr_sz(mhdr, NAMED_H_SIZE); - msg_set_nametype(mhdr, type); - msg_set_nameinst(mhdr, inst); - msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); + msg_set_type(hdr, TIPC_NAMED_MSG); + msg_set_hdr_sz(hdr, NAMED_H_SIZE); + msg_set_nametype(hdr, type); + msg_set_nameinst(hdr, inst); + msg_set_lookup_scope(hdr, tipc_addr_scope(domain)); dport = tipc_nametbl_translate(net, type, inst, &dnode); - msg_set_destnode(mhdr, dnode); - msg_set_destport(mhdr, dport); + msg_set_destnode(hdr, dnode); + msg_set_destport(hdr, dport); if (unlikely(!dport && !dnode)) return -EHOSTUNREACH; + } else if (dest->addrtype == TIPC_ADDR_ID) { dnode = dest->addr.id.node; - msg_set_type(mhdr, TIPC_DIRECT_MSG); - msg_set_lookup_scope(mhdr, 0); - msg_set_destnode(mhdr, dnode); - msg_set_destport(mhdr, dest->addr.id.ref); - msg_set_hdr_sz(mhdr, BASIC_H_SIZE); + msg_set_type(hdr, TIPC_DIRECT_MSG); + msg_set_lookup_scope(hdr, 0); + msg_set_destnode(hdr, dnode); + msg_set_destport(hdr, dest->addr.id.ref); + msg_set_hdr_sz(hdr, BASIC_H_SIZE); } - skb_queue_head_init(&pktchain); - save = m->msg_iter; -new_mtu: - mtu = tipc_node_get_mtu(net, dnode, tsk->portid); - rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain); - if (rc < 0) + /* Block or return if destination link is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); + if (unlikely(rc)) return rc; - do { - skb = skb_peek(&pktchain); - TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; - rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); - if (likely(!rc)) { - if (!is_connectionless) - tipc_set_sk_state(sk, TIPC_CONNECTING); - return dsz; - } - if (rc == -ELINKCONG) { - tsk->link_cong = 1; - rc = tipc_wait_for_sndmsg(sock, &timeo); - if (!rc) - continue; - } - __skb_queue_purge(&pktchain); - if (rc == -EMSGSIZE) { - m->msg_iter = save; - goto new_mtu; - } - break; - } while (1); - - return rc; -} + skb_queue_head_init(&pkts); + mtu = tipc_node_get_mtu(net, dnode, tsk->portid); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; -static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) -{ - DEFINE_WAIT_FUNC(wait, woken_wake_function); - struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - int done; + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + u32_push(clinks, dnode); + tsk->cong_link_cnt++; + rc = 0; + } - do { - int err = sock_error(sk); - if (err) - return err; - if (sk->sk_state == TIPC_DISCONNECTING) - return -EPIPE; - else if (!tipc_sk_connected(sk)) - return -ENOTCONN; - if (!*timeo_p) - return -EAGAIN; - if (signal_pending(current)) - return sock_intr_errno(*timeo_p); + if (unlikely(syn && !rc)) + tipc_set_sk_state(sk, TIPC_CONNECTING); - add_wait_queue(sk_sleep(sk), &wait); - done = sk_wait_event(sk, timeo_p, - (!tsk->link_cong && - !tsk_conn_cong(tsk)) || - !tipc_sk_connected(sk), &wait); - remove_wait_queue(sk_sleep(sk), &wait); - } while (!done); - return 0; + return rc ? rc : dlen; } /** - * tipc_send_stream - send stream-oriented data + * tipc_sendstream - send stream-oriented data * @sock: socket structure * @m: data to send * @dsz: total length of data to be transmitted @@ -1026,94 +1018,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) * Returns the number of bytes sent on success (or partial success), * or errno if no data sent */ -static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz) { struct sock *sk = sock->sk; int ret; lock_sock(sk); - ret = __tipc_send_stream(sock, m, dsz); + ret = __tipc_sendstream(sock, m, dsz); release_sock(sk); return ret; } -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) { struct sock *sk = sock->sk; - struct net *net = sock_net(sk); - struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_msg *mhdr = &tsk->phdr; - struct sk_buff_head pktchain; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - u32 portid = tsk->portid; - int rc = -EINVAL; - long timeo; - u32 dnode; - uint mtu, send, sent = 0; - struct iov_iter save; - int hlen = MIN_H_SIZE; - - /* Handle implied connection establishment */ - if (unlikely(dest)) { - rc = __tipc_sendmsg(sock, m, dsz); - hlen = msg_hdr_sz(mhdr); - if (dsz && (dsz == rc)) - tsk->snt_unacked = tsk_inc(tsk, dsz + hlen); - return rc; - } - if (dsz > (uint)INT_MAX) - return -EMSGSIZE; - - if (unlikely(!tipc_sk_connected(sk))) { - if (sk->sk_state == TIPC_DISCONNECTING) - return -EPIPE; - else - return -ENOTCONN; - } + long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = &tsk->phdr; + struct net *net = sock_net(sk); + struct sk_buff_head pkts; + u32 dnode = tsk_peer_node(tsk); + int send, sent = 0; + int rc = 0; - timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - if (!timeo && tsk->link_cong) - return -ELINKCONG; + skb_queue_head_init(&pkts); - dnode = tsk_peer_node(tsk); - skb_queue_head_init(&pktchain); + if (unlikely(dlen > INT_MAX)) + return -EMSGSIZE; -next: - save = m->msg_iter; - mtu = tsk->max_pkt; - send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); - rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); - if (unlikely(rc < 0)) + /* Handle implicit connection setup */ + if (unlikely(dest)) { + rc = __tipc_sendmsg(sock, m, dlen); + if (dlen && (dlen == rc)) + tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr)); return rc; + } do { - if (likely(!tsk_conn_cong(tsk))) { - rc = tipc_node_xmit(net, &pktchain, dnode, portid); - if (likely(!rc)) { - tsk->snt_unacked += tsk_inc(tsk, send + hlen); - sent += send; - if (sent == dsz) - return dsz; - goto next; - } - if (rc == -EMSGSIZE) { - __skb_queue_purge(&pktchain); - tsk->max_pkt = tipc_node_get_mtu(net, dnode, - portid); - m->msg_iter = save; - goto next; - } - if (rc != -ELINKCONG) - break; + rc = tipc_wait_for_cond(sock, &timeout, + (!tsk->cong_link_cnt && + !tsk_conn_cong(tsk) && + tipc_sk_connected(sk))); + if (unlikely(rc)) + break; - tsk->link_cong = 1; + send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); + rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); + if (unlikely(rc != send)) + break; + + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + tsk->cong_link_cnt = 1; + rc = 0; } - rc = tipc_wait_for_sndpkt(sock, &timeo); - } while (!rc); + if (likely(!rc)) { + tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); + sent += send; + } + } while (sent < dlen && !rc); - __skb_queue_purge(&pktchain); - return sent ? sent : rc; + return rc ? rc : sent; } /** @@ -1131,7 +1098,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz) if (dsz > TIPC_MAX_USER_MSG_SIZE) return -EMSGSIZE; - return tipc_send_stream(sock, m, dsz); + return tipc_sendstream(sock, m, dsz); } /* tipc_sk_finish_conn - complete the setup of a connection @@ -1698,6 +1665,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, unsigned int limit = rcvbuf_limit(sk, skb); int err = TIPC_OK; int usr = msg_user(hdr); + u32 onode; if (unlikely(msg_user(hdr) == CONN_MANAGER)) { tipc_sk_proto_rcv(tsk, skb, xmitq); @@ -1705,8 +1673,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, } if (unlikely(usr == SOCK_WAKEUP)) { + onode = msg_orignode(hdr); kfree_skb(skb); - tsk->link_cong = 0; + u32_del(&tsk->cong_links, onode); + tsk->cong_link_cnt--; sk->sk_write_space(sk); return false; } @@ -2114,7 +2084,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) struct msghdr m = {NULL,}; tsk_advance_rx_queue(sk); - __tipc_send_stream(new_sock, &m, 0); + __tipc_sendstream(new_sock, &m, 0); } else { __skb_dequeue(&sk->sk_receive_queue); __skb_queue_head(&new_sk->sk_receive_queue, buf); @@ -2269,24 +2239,27 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, void tipc_sk_reinit(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); - const struct bucket_table *tbl; - struct rhash_head *pos; + struct rhashtable_iter iter; struct tipc_sock *tsk; struct tipc_msg *msg; - int i; - rcu_read_lock(); - tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht); - for (i = 0; i < tbl->size; i++) { - rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { + rhashtable_walk_enter(&tn->sk_rht, &iter); + + do { + tsk = ERR_PTR(rhashtable_walk_start(&iter)); + if (tsk) + continue; + + while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) { spin_lock_bh(&tsk->sk.sk_lock.slock); msg = &tsk->phdr; msg_set_prevnode(msg, tn->own_addr); msg_set_orignode(msg, tn->own_addr); spin_unlock_bh(&tsk->sk.sk_lock.slock); } - } - rcu_read_unlock(); + + rhashtable_walk_stop(&iter); + } while (tsk == ERR_PTR(-EAGAIN)); } static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid) @@ -2382,18 +2355,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - u32 value; - int res; + u32 value = 0; + int res = 0; if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM)) return 0; if (lvl != SOL_TIPC) return -ENOPROTOOPT; - if (ol < sizeof(value)) - return -EINVAL; - res = get_user(value, (u32 __user *)ov); - if (res) - return res; + + switch (opt) { + case TIPC_IMPORTANCE: + case TIPC_SRC_DROPPABLE: + case TIPC_DEST_DROPPABLE: + case TIPC_CONN_TIMEOUT: + if (ol < sizeof(value)) + return -EINVAL; + res = get_user(value, (u32 __user *)ov); + if (res) + return res; + break; + default: + if (ov || ol) + return -EINVAL; + } lock_sock(sk); @@ -2412,7 +2396,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, break; case TIPC_CONN_TIMEOUT: tipc_sk(sk)->conn_timeout = value; - /* no need to set "res", since already 0 at this point */ + break; + case TIPC_MCAST_BROADCAST: + tsk->mc_method.rcast = false; + tsk->mc_method.mandatory = true; + break; + case TIPC_MCAST_REPLICAST: + tsk->mc_method.rcast = true; + tsk->mc_method.mandatory = true; break; default: res = -EINVAL; @@ -2575,7 +2566,7 @@ static const struct proto_ops stream_ops = { .shutdown = tipc_shutdown, .setsockopt = tipc_setsockopt, .getsockopt = tipc_getsockopt, - .sendmsg = tipc_send_stream, + .sendmsg = tipc_sendstream, .recvmsg = tipc_recv_stream, .mmap = sock_no_mmap, .sendpage = sock_no_sendpage |