diff options
Diffstat (limited to 'net/tipc/socket.c')
| -rw-r--r-- | net/tipc/socket.c | 1015 | 
1 files changed, 453 insertions, 562 deletions
| diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 4731cad99d1c..f73e975af80b 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@  /*   * net/tipc/socket.c: TIPC socket API   * - * Copyright (c) 2001-2007, 2012-2014, Ericsson AB + * Copyright (c) 2001-2007, 2012-2015, Ericsson AB   * Copyright (c) 2004-2008, 2010-2013, Wind River Systems   * All rights reserved.   * @@ -34,22 +34,25 @@   * POSSIBILITY OF SUCH DAMAGE.   */ +#include <linux/rhashtable.h> +#include <linux/jhash.h>  #include "core.h"  #include "name_table.h"  #include "node.h"  #include "link.h" -#include <linux/export.h> -#include "config.h" +#include "name_distr.h"  #include "socket.h" -#define SS_LISTENING	-1	/* socket is listening */ -#define SS_READY	-2	/* socket is connectionless */ +#define SS_LISTENING		-1	/* socket is listening */ +#define SS_READY		-2	/* socket is connectionless */ -#define CONN_TIMEOUT_DEFAULT  8000	/* default connect timeout = 8s */ -#define CONN_PROBING_INTERVAL 3600000	/* [ms] => 1 h */ -#define TIPC_FWD_MSG	      1 -#define TIPC_CONN_OK          0 -#define TIPC_CONN_PROBING     1 +#define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */ +#define CONN_PROBING_INTERVAL	msecs_to_jiffies(3600000)  /* [ms] => 1 h */ +#define TIPC_FWD_MSG		1 +#define TIPC_CONN_OK		0 +#define TIPC_CONN_PROBING	1 +#define TIPC_MAX_PORT		0xffffffff +#define TIPC_MIN_PORT		1  /**   * struct tipc_sock - TIPC socket structure @@ -59,21 +62,20 @@   * @conn_instance: TIPC instance used when connection was established   * @published: non-zero if port has one or more associated names   * @max_pkt: maximum packet size "hint" used when building messages sent by port - * @ref: unique reference to port in TIPC object registry + * @portid: unique port identity in TIPC socket hash table   * @phdr: preformatted message header used when sending messages   * @port_list: adjacent ports in TIPC's global list of ports   * @publications: list of publications for port   * @pub_count: total # of publications port has made during its lifetime   * @probing_state: - * @probing_interval: - * @timer: - * @port: port - interacts with 'sk' and with the rest of the TIPC stack - * @peer_name: the peer of the connection, if any + * @probing_intv:   * @conn_timeout: the time we can wait for an unresponded setup request   * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue   * @link_cong: non-zero if owner must sleep because of link congestion   * @sent_unacked: # messages sent by socket, and not yet acked by peer   * @rcv_unacked: # messages read by user, but not yet acked back to peer + * @node: hash table node + * @rcu: rcu struct for tipc_sock   */  struct tipc_sock {  	struct sock sk; @@ -82,19 +84,20 @@ struct tipc_sock {  	u32 conn_instance;  	int published;  	u32 max_pkt; -	u32 ref; +	u32 portid;  	struct tipc_msg phdr;  	struct list_head sock_list;  	struct list_head publications;  	u32 pub_count;  	u32 probing_state; -	u32 probing_interval; -	struct timer_list timer; +	unsigned long probing_intv;  	uint conn_timeout;  	atomic_t dupl_rcvcnt;  	bool link_cong;  	uint sent_unacked;  	uint rcv_unacked; +	struct rhash_head node; +	struct rcu_head rcu;  };  static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); @@ -103,16 +106,14 @@ static void tipc_write_space(struct sock *sk);  static int tipc_release(struct socket *sock);  static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);  static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p); -static void tipc_sk_timeout(unsigned long ref); +static void tipc_sk_timeout(unsigned long data);  static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,  			   struct tipc_name_seq const *seq);  static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  			    struct tipc_name_seq const *seq); -static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk); -static void tipc_sk_ref_discard(u32 ref); -static struct tipc_sock *tipc_sk_get(u32 ref); -static struct tipc_sock *tipc_sk_get_next(u32 *ref); -static void tipc_sk_put(struct tipc_sock *tsk); +static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); +static int tipc_sk_insert(struct tipc_sock *tsk); +static void tipc_sk_remove(struct tipc_sock *tsk);  static const struct proto_ops packet_ops;  static const struct proto_ops stream_ops; @@ -174,6 +175,11 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {   *   - port reference   */ +static u32 tsk_own_node(struct tipc_sock *tsk) +{ +	return msg_prevnode(&tsk->phdr); +} +  static u32 tsk_peer_node(struct tipc_sock *tsk)  {  	return msg_destnode(&tsk->phdr); @@ -246,10 +252,11 @@ static void tsk_rej_rx_queue(struct sock *sk)  {  	struct sk_buff *skb;  	u32 dnode; +	u32 own_node = tsk_own_node(tipc_sk(sk));  	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { -		if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT)) -			tipc_link_xmit_skb(skb, dnode, 0); +		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) +			tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);  	}  } @@ -260,6 +267,7 @@ static void tsk_rej_rx_queue(struct sock *sk)   */  static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)  { +	struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);  	u32 peer_port = tsk_peer_port(tsk);  	u32 orig_node;  	u32 peer_node; @@ -276,10 +284,10 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)  	if (likely(orig_node == peer_node))  		return true; -	if (!orig_node && (peer_node == tipc_own_addr)) +	if (!orig_node && (peer_node == tn->own_addr))  		return true; -	if (!peer_node && (orig_node == tipc_own_addr)) +	if (!peer_node && (orig_node == tn->own_addr))  		return true;  	return false; @@ -300,12 +308,12 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)  static int tipc_sk_create(struct net *net, struct socket *sock,  			  int protocol, int kern)  { +	struct tipc_net *tn;  	const struct proto_ops *ops;  	socket_state state;  	struct sock *sk;  	struct tipc_sock *tsk;  	struct tipc_msg *msg; -	u32 ref;  	/* Validate arguments */  	if (unlikely(protocol != 0)) @@ -339,24 +347,23 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  		return -ENOMEM;  	tsk = tipc_sk(sk); -	ref = tipc_sk_ref_acquire(tsk); -	if (!ref) { -		pr_warn("Socket create failed; reference table exhausted\n"); -		return -ENOMEM; -	}  	tsk->max_pkt = MAX_PKT_DEFAULT; -	tsk->ref = ref;  	INIT_LIST_HEAD(&tsk->publications);  	msg = &tsk->phdr; -	tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, +	tn = net_generic(sock_net(sk), tipc_net_id); +	tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,  		      NAMED_H_SIZE, 0); -	msg_set_origport(msg, ref);  	/* Finish initializing socket data structures */  	sock->ops = ops;  	sock->state = state;  	sock_init_data(sock, sk); -	k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref); +	if (tipc_sk_insert(tsk)) { +		pr_warn("Socket create failed; port numbrer exhausted\n"); +		return -EINVAL; +	} +	msg_set_origport(msg, tsk->portid); +	setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);  	sk->sk_backlog_rcv = tipc_backlog_rcv;  	sk->sk_rcvbuf = sysctl_tipc_rmem[1];  	sk->sk_data_ready = tipc_data_ready; @@ -384,7 +391,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,   *   * Returns 0 on success, errno otherwise   */ -int tipc_sock_create_local(int type, struct socket **res) +int tipc_sock_create_local(struct net *net, int type, struct socket **res)  {  	int rc; @@ -393,7 +400,7 @@ int tipc_sock_create_local(int type, struct socket **res)  		pr_err("Failed to create kernel socket\n");  		return rc;  	} -	tipc_sk_create(&init_net, *res, 0, 1); +	tipc_sk_create(net, *res, 0, 1);  	return 0;  } @@ -442,6 +449,13 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,  	return ret;  } +static void tipc_sk_callback(struct rcu_head *head) +{ +	struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu); + +	sock_put(&tsk->sk); +} +  /**   * tipc_release - destroy a TIPC socket   * @sock: socket to destroy @@ -461,9 +475,10 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,  static int tipc_release(struct socket *sock)  {  	struct sock *sk = sock->sk; +	struct net *net;  	struct tipc_sock *tsk;  	struct sk_buff *skb; -	u32 dnode; +	u32 dnode, probing_state;  	/*  	 * Exit if socket isn't fully initialized (occurs when a failed accept() @@ -472,6 +487,7 @@ static int tipc_release(struct socket *sock)  	if (sk == NULL)  		return 0; +	net = sock_net(sk);  	tsk = tipc_sk(sk);  	lock_sock(sk); @@ -491,26 +507,29 @@ static int tipc_release(struct socket *sock)  			    (sock->state == SS_CONNECTED)) {  				sock->state = SS_DISCONNECTING;  				tsk->connected = 0; -				tipc_node_remove_conn(dnode, tsk->ref); +				tipc_node_remove_conn(net, dnode, tsk->portid);  			} -			if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT)) -				tipc_link_xmit_skb(skb, dnode, 0); +			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, +					     TIPC_ERR_NO_PORT)) +				tipc_link_xmit_skb(net, skb, dnode, 0);  		}  	}  	tipc_sk_withdraw(tsk, 0, NULL); -	tipc_sk_ref_discard(tsk->ref); -	k_cancel_timer(&tsk->timer); +	probing_state = tsk->probing_state; +	if (del_timer_sync(&sk->sk_timer) && +	    probing_state != TIPC_CONN_PROBING) +		sock_put(sk); +	tipc_sk_remove(tsk);  	if (tsk->connected) { -		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, -				      SHORT_H_SIZE, 0, dnode, tipc_own_addr, -				      tsk_peer_port(tsk), -				      tsk->ref, TIPC_ERR_NO_PORT); +		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, +				      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode, +				      tsk_own_node(tsk), tsk_peer_port(tsk), +				      tsk->portid, TIPC_ERR_NO_PORT);  		if (skb) -			tipc_link_xmit_skb(skb, dnode, tsk->ref); -		tipc_node_remove_conn(dnode, tsk->ref); +			tipc_link_xmit_skb(net, skb, dnode, tsk->portid); +		tipc_node_remove_conn(net, dnode, tsk->portid);  	} -	k_term_timer(&tsk->timer);  	/* Discard any remaining (connection-based) messages in receive queue */  	__skb_queue_purge(&sk->sk_receive_queue); @@ -518,7 +537,8 @@ static int tipc_release(struct socket *sock)  	/* Reject any messages that accumulated in backlog queue */  	sock->state = SS_DISCONNECTING;  	release_sock(sk); -	sock_put(sk); + +	call_rcu(&tsk->rcu, tipc_sk_callback);  	sock->sk = NULL;  	return 0; @@ -602,6 +622,7 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,  {  	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;  	struct tipc_sock *tsk = tipc_sk(sock->sk); +	struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);  	memset(addr, 0, sizeof(*addr));  	if (peer) { @@ -611,8 +632,8 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,  		addr->addr.id.ref = tsk_peer_port(tsk);  		addr->addr.id.node = tsk_peer_node(tsk);  	} else { -		addr->addr.id.ref = tsk->ref; -		addr->addr.id.node = tipc_own_addr; +		addr->addr.id.ref = tsk->portid; +		addr->addr.id.node = tn->own_addr;  	}  	*uaddr_len = sizeof(*addr); @@ -711,8 +732,11 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,  			  struct msghdr *msg, size_t dsz, long timeo)  {  	struct sock *sk = sock->sk; -	struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; -	struct sk_buff_head head; +	struct tipc_sock *tsk = tipc_sk(sk); +	struct net *net = sock_net(sk); +	struct tipc_msg *mhdr = &tsk->phdr; +	struct sk_buff_head *pktchain = &sk->sk_write_queue; +	struct iov_iter save = msg->msg_iter;  	uint mtu;  	int rc; @@ -727,83 +751,97 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,  new_mtu:  	mtu = tipc_bclink_get_mtu(); -	__skb_queue_head_init(&head); -	rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head); +	rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);  	if (unlikely(rc < 0))  		return rc;  	do { -		rc = tipc_bclink_xmit(&head); +		rc = tipc_bclink_xmit(net, pktchain);  		if (likely(rc >= 0)) {  			rc = dsz;  			break;  		} -		if (rc == -EMSGSIZE) +		if (rc == -EMSGSIZE) { +			msg->msg_iter = save;  			goto new_mtu; +		}  		if (rc != -ELINKCONG)  			break;  		tipc_sk(sk)->link_cong = 1;  		rc = tipc_wait_for_sndmsg(sock, &timeo);  		if (rc) -			__skb_queue_purge(&head); +			__skb_queue_purge(pktchain);  	} while (!rc);  	return rc;  } -/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets +/** + * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets + * @arrvq: queue with arriving messages, to be cloned after destination lookup + * @inputq: queue with cloned messages, delivered to socket after dest lookup + * + * Multi-threaded: parallel calls with reference to same queues may occur   */ -void tipc_sk_mcast_rcv(struct sk_buff *buf) +void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, +		       struct sk_buff_head *inputq)  { -	struct tipc_msg *msg = buf_msg(buf); -	struct tipc_port_list dports = {0, NULL, }; -	struct tipc_port_list *item; -	struct sk_buff *b; -	uint i, last, dst = 0; +	struct tipc_msg *msg; +	struct tipc_plist dports; +	u32 portid;  	u32 scope = TIPC_CLUSTER_SCOPE; - -	if (in_own_node(msg_orignode(msg))) -		scope = TIPC_NODE_SCOPE; - -	/* Create destination port list: */ -	tipc_nametbl_mc_translate(msg_nametype(msg), -				  msg_namelower(msg), -				  msg_nameupper(msg), -				  scope, -				  &dports); -	last = dports.count; -	if (!last) { -		kfree_skb(buf); -		return; -	} - -	for (item = &dports; item; item = item->next) { -		for (i = 0; i < PLSIZE && ++dst <= last; i++) { -			b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf; -			if (!b) { -				pr_warn("Failed do clone mcast rcv buffer\n"); +	struct sk_buff_head tmpq; +	uint hsz; +	struct sk_buff *skb, *_skb; + +	__skb_queue_head_init(&tmpq); +	tipc_plist_init(&dports); + +	skb = tipc_skb_peek(arrvq, &inputq->lock); +	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { +		msg = buf_msg(skb); +		hsz = skb_headroom(skb) + msg_hdr_sz(msg); + +		if (in_own_node(net, msg_orignode(msg))) +			scope = TIPC_NODE_SCOPE; + +		/* Create destination port list and message clones: */ +		tipc_nametbl_mc_translate(net, +					  msg_nametype(msg), msg_namelower(msg), +					  msg_nameupper(msg), scope, &dports); +		portid = tipc_plist_pop(&dports); +		for (; portid; portid = tipc_plist_pop(&dports)) { +			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC); +			if (_skb) { +				msg_set_destport(buf_msg(_skb), portid); +				__skb_queue_tail(&tmpq, _skb);  				continue;  			} -			msg_set_destport(msg, item->ports[i]); -			tipc_sk_rcv(b); +			pr_warn("Failed to clone mcast rcv buffer\n");  		} +		/* Append to inputq if not already done by other thread */ +		spin_lock_bh(&inputq->lock); +		if (skb_peek(arrvq) == skb) { +			skb_queue_splice_tail_init(&tmpq, inputq); +			kfree_skb(__skb_dequeue(arrvq)); +		} +		spin_unlock_bh(&inputq->lock); +		__skb_queue_purge(&tmpq); +		kfree_skb(skb);  	} -	tipc_port_list_free(&dports); +	tipc_sk_rcv(net, inputq);  }  /**   * tipc_sk_proto_rcv - receive a connection mng protocol message   * @tsk: receiving socket - * @dnode: node to send response message to, if any - * @buf: buffer containing protocol message - * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if - * (CONN_PROBE_REPLY) message should be forwarded. + * @skb: pointer to message buffer. Set to NULL if buffer is consumed.   */ -static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode, -			     struct sk_buff *buf) +static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)  { -	struct tipc_msg *msg = buf_msg(buf); +	struct tipc_msg *msg = buf_msg(*skb);  	int conn_cong; - +	u32 dnode; +	u32 own_node = tsk_own_node(tsk);  	/* Ignore if connection cannot be validated: */  	if (!tsk_peer_msg(tsk, msg))  		goto exit; @@ -816,15 +854,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,  		if (conn_cong)  			tsk->sk.sk_write_space(&tsk->sk);  	} else if (msg_type(msg) == CONN_PROBE) { -		if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) -			return TIPC_OK; -		msg_set_type(msg, CONN_PROBE_REPLY); -		return TIPC_FWD_MSG; +		if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) { +			msg_set_type(msg, CONN_PROBE_REPLY); +			return; +		}  	}  	/* Do nothing if msg_type() == CONN_PROBE_REPLY */  exit: -	kfree_skb(buf); -	return TIPC_OK; +	kfree_skb(*skb); +	*skb = NULL;  }  static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) @@ -872,11 +910,13 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); +	struct net *net = sock_net(sk);  	struct tipc_msg *mhdr = &tsk->phdr;  	u32 dnode, dport; -	struct sk_buff_head head; +	struct sk_buff_head *pktchain = &sk->sk_write_queue;  	struct sk_buff *skb;  	struct tipc_name_seq *seq = &dest->addr.nameseq; +	struct iov_iter save;  	u32 mtu;  	long timeo;  	int rc; @@ -929,7 +969,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,  		msg_set_nametype(mhdr, type);  		msg_set_nameinst(mhdr, inst);  		msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); -		dport = tipc_nametbl_translate(type, inst, &dnode); +		dport = tipc_nametbl_translate(net, type, inst, &dnode);  		msg_set_destnode(mhdr, dnode);  		msg_set_destport(mhdr, dport);  		if (unlikely(!dport && !dnode)) { @@ -945,31 +985,33 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,  		msg_set_hdr_sz(mhdr, BASIC_H_SIZE);  	} +	save = m->msg_iter;  new_mtu: -	mtu = tipc_node_get_mtu(dnode, tsk->ref); -	__skb_queue_head_init(&head); -	rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head); +	mtu = tipc_node_get_mtu(net, dnode, tsk->portid); +	rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);  	if (rc < 0)  		goto exit;  	do { -		skb = skb_peek(&head); +		skb = skb_peek(pktchain);  		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; -		rc = tipc_link_xmit(&head, dnode, tsk->ref); +		rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);  		if (likely(rc >= 0)) {  			if (sock->state != SS_READY)  				sock->state = SS_CONNECTING;  			rc = dsz;  			break;  		} -		if (rc == -EMSGSIZE) +		if (rc == -EMSGSIZE) { +			m->msg_iter = save;  			goto new_mtu; +		}  		if (rc != -ELINKCONG)  			break;  		tsk->link_cong = 1;  		rc = tipc_wait_for_sndmsg(sock, &timeo);  		if (rc) -			__skb_queue_purge(&head); +			__skb_queue_purge(pktchain);  	} while (!rc);  exit:  	if (iocb) @@ -1024,15 +1066,17 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,  			    struct msghdr *m, size_t dsz)  {  	struct sock *sk = sock->sk; +	struct net *net = sock_net(sk);  	struct tipc_sock *tsk = tipc_sk(sk);  	struct tipc_msg *mhdr = &tsk->phdr; -	struct sk_buff_head head; +	struct sk_buff_head *pktchain = &sk->sk_write_queue;  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); -	u32 ref = tsk->ref; +	u32 portid = tsk->portid;  	int rc = -EINVAL;  	long timeo;  	u32 dnode;  	uint mtu, send, sent = 0; +	struct iov_iter save;  	/* Handle implied connection establishment */  	if (unlikely(dest)) { @@ -1059,15 +1103,15 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,  	dnode = tsk_peer_node(tsk);  next: +	save = m->msg_iter;  	mtu = tsk->max_pkt;  	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); -	__skb_queue_head_init(&head); -	rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head); +	rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);  	if (unlikely(rc < 0))  		goto exit;  	do {  		if (likely(!tsk_conn_cong(tsk))) { -			rc = tipc_link_xmit(&head, dnode, ref); +			rc = tipc_link_xmit(net, pktchain, dnode, portid);  			if (likely(!rc)) {  				tsk->sent_unacked++;  				sent += send; @@ -1076,7 +1120,9 @@ next:  				goto next;  			}  			if (rc == -EMSGSIZE) { -				tsk->max_pkt = tipc_node_get_mtu(dnode, ref); +				tsk->max_pkt = tipc_node_get_mtu(net, dnode, +								 portid); +				m->msg_iter = save;  				goto next;  			}  			if (rc != -ELINKCONG) @@ -1085,7 +1131,7 @@ next:  		}  		rc = tipc_wait_for_sndpkt(sock, &timeo);  		if (rc) -			__skb_queue_purge(&head); +			__skb_queue_purge(pktchain);  	} while (!rc);  exit:  	if (iocb) @@ -1118,6 +1164,8 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,  static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,  				u32 peer_node)  { +	struct sock *sk = &tsk->sk; +	struct net *net = sock_net(sk);  	struct tipc_msg *msg = &tsk->phdr;  	msg_set_destnode(msg, peer_node); @@ -1126,12 +1174,12 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,  	msg_set_lookup_scope(msg, 0);  	msg_set_hdr_sz(msg, SHORT_H_SIZE); -	tsk->probing_interval = CONN_PROBING_INTERVAL; +	tsk->probing_intv = CONN_PROBING_INTERVAL;  	tsk->probing_state = TIPC_CONN_OK;  	tsk->connected = 1; -	k_start_timer(&tsk->timer, tsk->probing_interval); -	tipc_node_add_conn(peer_node, tsk->ref, peer_port); -	tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref); +	sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv); +	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); +	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);  }  /** @@ -1230,6 +1278,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,  static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)  { +	struct net *net = sock_net(&tsk->sk);  	struct sk_buff *skb = NULL;  	struct tipc_msg *msg;  	u32 peer_port = tsk_peer_port(tsk); @@ -1237,13 +1286,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)  	if (!tsk->connected)  		return; -	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, -			      tipc_own_addr, peer_port, tsk->ref, TIPC_OK); +	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, +			      dnode, tsk_own_node(tsk), peer_port, +			      tsk->portid, TIPC_OK);  	if (!skb)  		return;  	msg = buf_msg(skb);  	msg_set_msgcnt(msg, ack); -	tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg)); +	tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));  }  static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) @@ -1529,15 +1579,16 @@ static void tipc_data_ready(struct sock *sk)  /**   * filter_connect - Handle all incoming messages for a connection-based socket   * @tsk: TIPC socket - * @msg: message + * @skb: pointer to message buffer. Set to NULL if buffer is consumed   *   * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise   */ -static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) +static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)  {  	struct sock *sk = &tsk->sk; +	struct net *net = sock_net(sk);  	struct socket *sock = sk->sk_socket; -	struct tipc_msg *msg = buf_msg(*buf); +	struct tipc_msg *msg = buf_msg(*skb);  	int retval = -TIPC_ERR_NO_PORT;  	if (msg_mcast(msg)) @@ -1551,8 +1602,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)  				sock->state = SS_DISCONNECTING;  				tsk->connected = 0;  				/* let timer expire on it's own */ -				tipc_node_remove_conn(tsk_peer_node(tsk), -						      tsk->ref); +				tipc_node_remove_conn(net, tsk_peer_node(tsk), +						      tsk->portid);  			}  			retval = TIPC_OK;  		} @@ -1587,8 +1638,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)  		 * connect() routine if sleeping.  		 */  		if (msg_data_sz(msg) == 0) { -			kfree_skb(*buf); -			*buf = NULL; +			kfree_skb(*skb); +			*skb = NULL;  			if (waitqueue_active(sk_sleep(sk)))  				wake_up_interruptible(sk_sleep(sk));  		} @@ -1640,32 +1691,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)  /**   * filter_rcv - validate incoming message   * @sk: socket - * @buf: message + * @skb: pointer to message. Set to NULL if buffer is consumed.   *   * Enqueues message on receive queue if acceptable; optionally handles   * disconnect indication for a connected socket.   * - * Called with socket lock already taken; port lock may also be taken. + * Called with socket lock already taken   * - * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message - * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded + * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected   */ -static int filter_rcv(struct sock *sk, struct sk_buff *buf) +static int filter_rcv(struct sock *sk, struct sk_buff **skb)  {  	struct socket *sock = sk->sk_socket;  	struct tipc_sock *tsk = tipc_sk(sk); -	struct tipc_msg *msg = buf_msg(buf); -	unsigned int limit = rcvbuf_limit(sk, buf); -	u32 onode; +	struct tipc_msg *msg = buf_msg(*skb); +	unsigned int limit = rcvbuf_limit(sk, *skb);  	int rc = TIPC_OK; -	if (unlikely(msg_user(msg) == CONN_MANAGER)) -		return tipc_sk_proto_rcv(tsk, &onode, buf); +	if (unlikely(msg_user(msg) == CONN_MANAGER)) { +		tipc_sk_proto_rcv(tsk, skb); +		return TIPC_OK; +	}  	if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { -		kfree_skb(buf); +		kfree_skb(*skb);  		tsk->link_cong = 0;  		sk->sk_write_space(sk); +		*skb = NULL;  		return TIPC_OK;  	} @@ -1677,21 +1729,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)  		if (msg_connected(msg))  			return -TIPC_ERR_NO_PORT;  	} else { -		rc = filter_connect(tsk, &buf); -		if (rc != TIPC_OK || buf == NULL) +		rc = filter_connect(tsk, skb); +		if (rc != TIPC_OK || !*skb)  			return rc;  	}  	/* Reject message if there isn't room to queue it */ -	if (sk_rmem_alloc_get(sk) + buf->truesize >= limit) +	if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)  		return -TIPC_ERR_OVERLOAD;  	/* Enqueue message */ -	TIPC_SKB_CB(buf)->handle = NULL; -	__skb_queue_tail(&sk->sk_receive_queue, buf); -	skb_set_owner_r(buf, sk); +	TIPC_SKB_CB(*skb)->handle = NULL; +	__skb_queue_tail(&sk->sk_receive_queue, *skb); +	skb_set_owner_r(*skb, sk);  	sk->sk_data_ready(sk); +	*skb = NULL;  	return TIPC_OK;  } @@ -1700,78 +1753,125 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)   * @sk: socket   * @skb: message   * - * Caller must hold socket lock, but not port lock. + * Caller must hold socket lock   *   * Returns 0   */  static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)  { -	int rc; -	u32 onode; +	int err; +	atomic_t *dcnt; +	u32 dnode;  	struct tipc_sock *tsk = tipc_sk(sk); +	struct net *net = sock_net(sk);  	uint truesize = skb->truesize; -	rc = filter_rcv(sk, skb); - -	if (likely(!rc)) { -		if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) -			atomic_add(truesize, &tsk->dupl_rcvcnt); +	err = filter_rcv(sk, &skb); +	if (likely(!skb)) { +		dcnt = &tsk->dupl_rcvcnt; +		if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT) +			atomic_add(truesize, dcnt);  		return 0;  	} +	if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) +		tipc_link_xmit_skb(net, skb, dnode, tsk->portid); +	return 0; +} -	if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc)) -		return 0; - -	tipc_link_xmit_skb(skb, onode, 0); +/** + * tipc_sk_enqueue - extract all buffers with destination 'dport' from + *                   inputq and try adding them to socket or backlog queue + * @inputq: list of incoming buffers with potentially different destinations + * @sk: socket where the buffers should be enqueued + * @dport: port number for the socket + * @_skb: returned buffer to be forwarded or rejected, if applicable + * + * Caller must hold socket lock + * + * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD + * or -TIPC_ERR_NO_PORT + */ +static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, +			   u32 dport, struct sk_buff **_skb) +{ +	unsigned int lim; +	atomic_t *dcnt; +	int err; +	struct sk_buff *skb; +	unsigned long time_limit = jiffies + 2; -	return 0; +	while (skb_queue_len(inputq)) { +		if (unlikely(time_after_eq(jiffies, time_limit))) +			return TIPC_OK; +		skb = tipc_skb_dequeue(inputq, dport); +		if (unlikely(!skb)) +			return TIPC_OK; +		if (!sock_owned_by_user(sk)) { +			err = filter_rcv(sk, &skb); +			if (likely(!skb)) +				continue; +			*_skb = skb; +			return err; +		} +		dcnt = &tipc_sk(sk)->dupl_rcvcnt; +		if (sk->sk_backlog.len) +			atomic_set(dcnt, 0); +		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); +		if (likely(!sk_add_backlog(sk, skb, lim))) +			continue; +		*_skb = skb; +		return -TIPC_ERR_OVERLOAD; +	} +	return TIPC_OK;  }  /** - * tipc_sk_rcv - handle incoming message - * @skb: buffer containing arriving message - * Consumes buffer - * Returns 0 if success, or errno: -EHOSTUNREACH + * tipc_sk_rcv - handle a chain of incoming buffers + * @inputq: buffer list containing the buffers + * Consumes all buffers in list until inputq is empty + * Note: may be called in multiple threads referring to the same queue + * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH + * Only node local calls check the return value, sending single-buffer queues   */ -int tipc_sk_rcv(struct sk_buff *skb) +int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)  { +	u32 dnode, dport = 0; +	int err = -TIPC_ERR_NO_PORT; +	struct sk_buff *skb;  	struct tipc_sock *tsk; +	struct tipc_net *tn;  	struct sock *sk; -	u32 dport = msg_destport(buf_msg(skb)); -	int rc = TIPC_OK; -	uint limit; -	u32 dnode; -	/* Validate destination and message */ -	tsk = tipc_sk_get(dport); -	if (unlikely(!tsk)) { -		rc = tipc_msg_eval(skb, &dnode); -		goto exit; +	while (skb_queue_len(inputq)) { +		skb = NULL; +		dport = tipc_skb_peek_port(inputq, dport); +		tsk = tipc_sk_lookup(net, dport); +		if (likely(tsk)) { +			sk = &tsk->sk; +			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { +				err = tipc_sk_enqueue(inputq, sk, dport, &skb); +				spin_unlock_bh(&sk->sk_lock.slock); +				dport = 0; +			} +			sock_put(sk); +		} else { +			skb = tipc_skb_dequeue(inputq, dport); +		} +		if (likely(!skb)) +			continue; +		if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) +			goto xmit; +		if (!err) { +			dnode = msg_destnode(buf_msg(skb)); +			goto xmit; +		} +		tn = net_generic(net, tipc_net_id); +		if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) +			continue; +xmit: +		tipc_link_xmit_skb(net, skb, dnode, dport);  	} -	sk = &tsk->sk; - -	/* Queue message */ -	spin_lock_bh(&sk->sk_lock.slock); - -	if (!sock_owned_by_user(sk)) { -		rc = filter_rcv(sk, skb); -	} else { -		if (sk->sk_backlog.len == 0) -			atomic_set(&tsk->dupl_rcvcnt, 0); -		limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt); -		if (sk_add_backlog(sk, skb, limit)) -			rc = -TIPC_ERR_OVERLOAD; -	} -	spin_unlock_bh(&sk->sk_lock.slock); -	tipc_sk_put(tsk); -	if (likely(!rc)) -		return 0; -exit: -	if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc)) -		return -EHOSTUNREACH; - -	tipc_link_xmit_skb(skb, dnode, 0); -	return (rc < 0) ? -EHOSTUNREACH : 0; +	return err ? -EHOSTUNREACH : 0;  }  static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -2027,6 +2127,7 @@ exit:  static int tipc_shutdown(struct socket *sock, int how)  {  	struct sock *sk = sock->sk; +	struct net *net = sock_net(sk);  	struct tipc_sock *tsk = tipc_sk(sk);  	struct sk_buff *skb;  	u32 dnode; @@ -2049,21 +2150,24 @@ restart:  				kfree_skb(skb);  				goto restart;  			} -			if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN)) -				tipc_link_xmit_skb(skb, dnode, tsk->ref); -			tipc_node_remove_conn(dnode, tsk->ref); +			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, +					     TIPC_CONN_SHUTDOWN)) +				tipc_link_xmit_skb(net, skb, dnode, +						   tsk->portid); +			tipc_node_remove_conn(net, dnode, tsk->portid);  		} else {  			dnode = tsk_peer_node(tsk); +  			skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,  					      TIPC_CONN_MSG, SHORT_H_SIZE, -					      0, dnode, tipc_own_addr, +					      0, dnode, tsk_own_node(tsk),  					      tsk_peer_port(tsk), -					      tsk->ref, TIPC_CONN_SHUTDOWN); -			tipc_link_xmit_skb(skb, dnode, tsk->ref); +					      tsk->portid, TIPC_CONN_SHUTDOWN); +			tipc_link_xmit_skb(net, skb, dnode, tsk->portid);  		}  		tsk->connected = 0;  		sock->state = SS_DISCONNECTING; -		tipc_node_remove_conn(dnode, tsk->ref); +		tipc_node_remove_conn(net, dnode, tsk->portid);  		/* fall through */  	case SS_DISCONNECTING: @@ -2084,18 +2188,14 @@ restart:  	return res;  } -static void tipc_sk_timeout(unsigned long ref) +static void tipc_sk_timeout(unsigned long data)  { -	struct tipc_sock *tsk; -	struct sock *sk; +	struct tipc_sock *tsk = (struct tipc_sock *)data; +	struct sock *sk = &tsk->sk;  	struct sk_buff *skb = NULL;  	u32 peer_port, peer_node; +	u32 own_node = tsk_own_node(tsk); -	tsk = tipc_sk_get(ref); -	if (!tsk) -		return; - -	sk = &tsk->sk;  	bh_lock_sock(sk);  	if (!tsk->connected) {  		bh_unlock_sock(sk); @@ -2106,38 +2206,39 @@ static void tipc_sk_timeout(unsigned long ref)  	if (tsk->probing_state == TIPC_CONN_PROBING) {  		/* Previous probe not answered -> self abort */ -		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, -				      SHORT_H_SIZE, 0, tipc_own_addr, -				      peer_node, ref, peer_port, -				      TIPC_ERR_NO_PORT); +		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, +				      TIPC_CONN_MSG, SHORT_H_SIZE, 0, +				      own_node, peer_node, tsk->portid, +				      peer_port, TIPC_ERR_NO_PORT);  	} else { -		skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, -				      0, peer_node, tipc_own_addr, -				      peer_port, ref, TIPC_OK); +		skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, +				      INT_H_SIZE, 0, peer_node, own_node, +				      peer_port, tsk->portid, TIPC_OK);  		tsk->probing_state = TIPC_CONN_PROBING; -		k_start_timer(&tsk->timer, tsk->probing_interval); +		sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);  	}  	bh_unlock_sock(sk);  	if (skb) -		tipc_link_xmit_skb(skb, peer_node, ref); +		tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);  exit: -	tipc_sk_put(tsk); +	sock_put(sk);  }  static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,  			   struct tipc_name_seq const *seq)  { +	struct net *net = sock_net(&tsk->sk);  	struct publication *publ;  	u32 key;  	if (tsk->connected)  		return -EINVAL; -	key = tsk->ref + tsk->pub_count + 1; -	if (key == tsk->ref) +	key = tsk->portid + tsk->pub_count + 1; +	if (key == tsk->portid)  		return -EADDRINUSE; -	publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, -				    scope, tsk->ref, key); +	publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper, +				    scope, tsk->portid, key);  	if (unlikely(!publ))  		return -EINVAL; @@ -2150,6 +2251,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,  static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  			    struct tipc_name_seq const *seq)  { +	struct net *net = sock_net(&tsk->sk);  	struct publication *publ;  	struct publication *safe;  	int rc = -EINVAL; @@ -2164,12 +2266,12 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  				continue;  			if (publ->upper != seq->upper)  				break; -			tipc_nametbl_withdraw(publ->type, publ->lower, +			tipc_nametbl_withdraw(net, publ->type, publ->lower,  					      publ->ref, publ->key);  			rc = 0;  			break;  		} -		tipc_nametbl_withdraw(publ->type, publ->lower, +		tipc_nametbl_withdraw(net, publ->type, publ->lower,  				      publ->ref, publ->key);  		rc = 0;  	} @@ -2178,336 +2280,105 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  	return rc;  } -static int tipc_sk_show(struct tipc_sock *tsk, char *buf, -			int len, int full_id) -{ -	struct publication *publ; -	int ret; - -	if (full_id) -		ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:", -				    tipc_zone(tipc_own_addr), -				    tipc_cluster(tipc_own_addr), -				    tipc_node(tipc_own_addr), tsk->ref); -	else -		ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref); - -	if (tsk->connected) { -		u32 dport = tsk_peer_port(tsk); -		u32 destnode = tsk_peer_node(tsk); - -		ret += tipc_snprintf(buf + ret, len - ret, -				     " connected to <%u.%u.%u:%u>", -				     tipc_zone(destnode), -				     tipc_cluster(destnode), -				     tipc_node(destnode), dport); -		if (tsk->conn_type != 0) -			ret += tipc_snprintf(buf + ret, len - ret, -					     " via {%u,%u}", tsk->conn_type, -					     tsk->conn_instance); -	} else if (tsk->published) { -		ret += tipc_snprintf(buf + ret, len - ret, " bound to"); -		list_for_each_entry(publ, &tsk->publications, pport_list) { -			if (publ->lower == publ->upper) -				ret += tipc_snprintf(buf + ret, len - ret, -						     " {%u,%u}", publ->type, -						     publ->lower); -			else -				ret += tipc_snprintf(buf + ret, len - ret, -						     " {%u,%u,%u}", publ->type, -						     publ->lower, publ->upper); -		} -	} -	ret += tipc_snprintf(buf + ret, len - ret, "\n"); -	return ret; -} - -struct sk_buff *tipc_sk_socks_show(void) -{ -	struct sk_buff *buf; -	struct tlv_desc *rep_tlv; -	char *pb; -	int pb_len; -	struct tipc_sock *tsk; -	int str_len = 0; -	u32 ref = 0; - -	buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN)); -	if (!buf) -		return NULL; -	rep_tlv = (struct tlv_desc *)buf->data; -	pb = TLV_DATA(rep_tlv); -	pb_len = ULTRA_STRING_MAX_LEN; - -	tsk = tipc_sk_get_next(&ref); -	for (; tsk; tsk = tipc_sk_get_next(&ref)) { -		lock_sock(&tsk->sk); -		str_len += tipc_sk_show(tsk, pb + str_len, -					pb_len - str_len, 0); -		release_sock(&tsk->sk); -		tipc_sk_put(tsk); -	} -	str_len += 1;	/* for "\0" */ -	skb_put(buf, TLV_SPACE(str_len)); -	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); - -	return buf; -} -  /* tipc_sk_reinit: set non-zero address in all existing sockets   *                 when we go from standalone to network mode.   */ -void tipc_sk_reinit(void) +void tipc_sk_reinit(struct net *net)  { +	struct tipc_net *tn = net_generic(net, tipc_net_id); +	const struct bucket_table *tbl; +	struct rhash_head *pos; +	struct tipc_sock *tsk;  	struct tipc_msg *msg; -	u32 ref = 0; -	struct tipc_sock *tsk = tipc_sk_get_next(&ref); +	int i; -	for (; tsk; tsk = tipc_sk_get_next(&ref)) { -		lock_sock(&tsk->sk); -		msg = &tsk->phdr; -		msg_set_prevnode(msg, tipc_own_addr); -		msg_set_orignode(msg, tipc_own_addr); -		release_sock(&tsk->sk); -		tipc_sk_put(tsk); +	rcu_read_lock(); +	tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht); +	for (i = 0; i < tbl->size; i++) { +		rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { +			spin_lock_bh(&tsk->sk.sk_lock.slock); +			msg = &tsk->phdr; +			msg_set_prevnode(msg, tn->own_addr); +			msg_set_orignode(msg, tn->own_addr); +			spin_unlock_bh(&tsk->sk.sk_lock.slock); +		}  	} +	rcu_read_unlock();  } -/** - * struct reference - TIPC socket reference entry - * @tsk: pointer to socket associated with reference entry - * @ref: reference value for socket (combines instance & array index info) - */ -struct reference { -	struct tipc_sock *tsk; -	u32 ref; -}; - -/** - * struct tipc_ref_table - table of TIPC socket reference entries - * @entries: pointer to array of reference entries - * @capacity: array index of first unusable entry - * @init_point: array index of first uninitialized entry - * @first_free: array index of first unused socket reference entry - * @last_free: array index of last unused socket reference entry - * @index_mask: bitmask for array index portion of reference values - * @start_mask: initial value for instance value portion of reference values - */ -struct ref_table { -	struct reference *entries; -	u32 capacity; -	u32 init_point; -	u32 first_free; -	u32 last_free; -	u32 index_mask; -	u32 start_mask; -}; - -/* Socket reference table consists of 2**N entries. - * - * State	Socket ptr	Reference - * -----        ----------      --------- - * In use        non-NULL       XXXX|own index - *				(XXXX changes each time entry is acquired) - * Free            NULL         YYYY|next free index - *				(YYYY is one more than last used XXXX) - * Uninitialized   NULL         0 - * - * Entry 0 is not used; this allows index 0 to denote the end of the free list. - * - * Note that a reference value of 0 does not necessarily indicate that an - * entry is uninitialized, since the last entry in the free list could also - * have a reference value of 0 (although this is unlikely). - */ - -static struct ref_table tipc_ref_table; - -static DEFINE_RWLOCK(ref_table_lock); - -/** - * tipc_ref_table_init - create reference table for sockets - */ -int tipc_sk_ref_table_init(u32 req_sz, u32 start) +static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)  { -	struct reference *table; -	u32 actual_sz; - -	/* account for unused entry, then round up size to a power of 2 */ - -	req_sz++; -	for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) { -		/* do nothing */ -	}; - -	/* allocate table & mark all entries as uninitialized */ -	table = vzalloc(actual_sz * sizeof(struct reference)); -	if (table == NULL) -		return -ENOMEM; - -	tipc_ref_table.entries = table; -	tipc_ref_table.capacity = req_sz; -	tipc_ref_table.init_point = 1; -	tipc_ref_table.first_free = 0; -	tipc_ref_table.last_free = 0; -	tipc_ref_table.index_mask = actual_sz - 1; -	tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask; +	struct tipc_net *tn = net_generic(net, tipc_net_id); +	struct tipc_sock *tsk; -	return 0; -} +	rcu_read_lock(); +	tsk = rhashtable_lookup(&tn->sk_rht, &portid); +	if (tsk) +		sock_hold(&tsk->sk); +	rcu_read_unlock(); -/** - * tipc_ref_table_stop - destroy reference table for sockets - */ -void tipc_sk_ref_table_stop(void) -{ -	if (!tipc_ref_table.entries) -		return; -	vfree(tipc_ref_table.entries); -	tipc_ref_table.entries = NULL; +	return tsk;  } -/* tipc_ref_acquire - create reference to a socket - * - * Register an socket pointer in the reference table. - * Returns a unique reference value that is used from then on to retrieve the - * socket pointer, or to determine if the socket has been deregistered. - */ -u32 tipc_sk_ref_acquire(struct tipc_sock *tsk) +static int tipc_sk_insert(struct tipc_sock *tsk)  { -	u32 index; -	u32 index_mask; -	u32 next_plus_upper; -	u32 ref = 0; -	struct reference *entry; - -	if (unlikely(!tsk)) { -		pr_err("Attempt to acquire ref. to non-existent obj\n"); -		return 0; -	} -	if (unlikely(!tipc_ref_table.entries)) { -		pr_err("Ref. table not found in acquisition attempt\n"); -		return 0; -	} - -	/* Take a free entry, if available; otherwise initialize a new one */ -	write_lock_bh(&ref_table_lock); -	index = tipc_ref_table.first_free; -	entry = &tipc_ref_table.entries[index]; - -	if (likely(index)) { -		index = tipc_ref_table.first_free; -		entry = &tipc_ref_table.entries[index]; -		index_mask = tipc_ref_table.index_mask; -		next_plus_upper = entry->ref; -		tipc_ref_table.first_free = next_plus_upper & index_mask; -		ref = (next_plus_upper & ~index_mask) + index; -		entry->tsk = tsk; -	} else if (tipc_ref_table.init_point < tipc_ref_table.capacity) { -		index = tipc_ref_table.init_point++; -		entry = &tipc_ref_table.entries[index]; -		ref = tipc_ref_table.start_mask + index; +	struct sock *sk = &tsk->sk; +	struct net *net = sock_net(sk); +	struct tipc_net *tn = net_generic(net, tipc_net_id); +	u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1; +	u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT; + +	while (remaining--) { +		portid++; +		if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT)) +			portid = TIPC_MIN_PORT; +		tsk->portid = portid; +		sock_hold(&tsk->sk); +		if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node)) +			return 0; +		sock_put(&tsk->sk);  	} -	if (ref) { -		entry->ref = ref; -		entry->tsk = tsk; -	} -	write_unlock_bh(&ref_table_lock); -	return ref; +	return -1;  } -/* tipc_sk_ref_discard - invalidate reference to an socket - * - * Disallow future references to an socket and free up the entry for re-use. - */ -void tipc_sk_ref_discard(u32 ref) +static void tipc_sk_remove(struct tipc_sock *tsk)  { -	struct reference *entry; -	u32 index; -	u32 index_mask; - -	if (unlikely(!tipc_ref_table.entries)) { -		pr_err("Ref. table not found during discard attempt\n"); -		return; -	} - -	index_mask = tipc_ref_table.index_mask; -	index = ref & index_mask; -	entry = &tipc_ref_table.entries[index]; - -	write_lock_bh(&ref_table_lock); +	struct sock *sk = &tsk->sk; +	struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id); -	if (unlikely(!entry->tsk)) { -		pr_err("Attempt to discard ref. to non-existent socket\n"); -		goto exit; +	if (rhashtable_remove(&tn->sk_rht, &tsk->node)) { +		WARN_ON(atomic_read(&sk->sk_refcnt) == 1); +		__sock_put(sk);  	} -	if (unlikely(entry->ref != ref)) { -		pr_err("Attempt to discard non-existent reference\n"); -		goto exit; -	} - -	/* Mark entry as unused; increment instance part of entry's -	 *   reference to invalidate any subsequent references -	 */ - -	entry->tsk = NULL; -	entry->ref = (ref & ~index_mask) + (index_mask + 1); - -	/* Append entry to free entry list */ -	if (unlikely(tipc_ref_table.first_free == 0)) -		tipc_ref_table.first_free = index; -	else -		tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index; -	tipc_ref_table.last_free = index; -exit: -	write_unlock_bh(&ref_table_lock);  } -/* tipc_sk_get - find referenced socket and return pointer to it - */ -struct tipc_sock *tipc_sk_get(u32 ref) +int tipc_sk_rht_init(struct net *net)  { -	struct reference *entry; -	struct tipc_sock *tsk; +	struct tipc_net *tn = net_generic(net, tipc_net_id); +	struct rhashtable_params rht_params = { +		.nelem_hint = 192, +		.head_offset = offsetof(struct tipc_sock, node), +		.key_offset = offsetof(struct tipc_sock, portid), +		.key_len = sizeof(u32), /* portid */ +		.hashfn = jhash, +		.max_shift = 20, /* 1M */ +		.min_shift = 8,  /* 256 */ +		.grow_decision = rht_grow_above_75, +		.shrink_decision = rht_shrink_below_30, +	}; -	if (unlikely(!tipc_ref_table.entries)) -		return NULL; -	read_lock_bh(&ref_table_lock); -	entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask]; -	tsk = entry->tsk; -	if (likely(tsk && (entry->ref == ref))) -		sock_hold(&tsk->sk); -	else -		tsk = NULL; -	read_unlock_bh(&ref_table_lock); -	return tsk; +	return rhashtable_init(&tn->sk_rht, &rht_params);  } -/* tipc_sk_get_next - lock & return next socket after referenced one -*/ -struct tipc_sock *tipc_sk_get_next(u32 *ref) +void tipc_sk_rht_destroy(struct net *net)  { -	struct reference *entry; -	struct tipc_sock *tsk = NULL; -	uint index = *ref & tipc_ref_table.index_mask; +	struct tipc_net *tn = net_generic(net, tipc_net_id); -	read_lock_bh(&ref_table_lock); -	while (++index < tipc_ref_table.capacity) { -		entry = &tipc_ref_table.entries[index]; -		if (!entry->tsk) -			continue; -		tsk = entry->tsk; -		sock_hold(&tsk->sk); -		*ref = entry->ref; -		break; -	} -	read_unlock_bh(&ref_table_lock); -	return tsk; -} +	/* Wait for socket readers to complete */ +	synchronize_net(); -static void tipc_sk_put(struct tipc_sock *tsk) -{ -	sock_put(&tsk->sk); +	rhashtable_destroy(&tn->sk_rht);  }  /** @@ -2639,8 +2510,9 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,  	return put_user(sizeof(value), ol);  } -static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg) +static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)  { +	struct sock *sk = sock->sk;  	struct tipc_sioc_ln_req lnr;  	void __user *argp = (void __user *)arg; @@ -2648,7 +2520,8 @@ static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)  	case SIOCGETLINKNAME:  		if (copy_from_user(&lnr, argp, sizeof(lnr)))  			return -EFAULT; -		if (!tipc_node_get_linkname(lnr.bearer_id & 0xffff, lnr.peer, +		if (!tipc_node_get_linkname(sock_net(sk), +					    lnr.bearer_id & 0xffff, lnr.peer,  					    lnr.linkname, TIPC_MAX_LINK_NAME)) {  			if (copy_to_user(argp, &lnr, sizeof(lnr)))  				return -EFAULT; @@ -2820,18 +2693,20 @@ static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,  	int err;  	void *hdr;  	struct nlattr *attrs; +	struct net *net = sock_net(skb->sk); +	struct tipc_net *tn = net_generic(net, tipc_net_id);  	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq, -			  &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET); +			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);  	if (!hdr)  		goto msg_cancel;  	attrs = nla_nest_start(skb, TIPC_NLA_SOCK);  	if (!attrs)  		goto genlmsg_cancel; -	if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref)) +	if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))  		goto attr_msg_cancel; -	if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr)) +	if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))  		goto attr_msg_cancel;  	if (tsk->connected) { @@ -2859,22 +2734,37 @@ int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)  {  	int err;  	struct tipc_sock *tsk; -	u32 prev_ref = cb->args[0]; -	u32 ref = prev_ref; - -	tsk = tipc_sk_get_next(&ref); -	for (; tsk; tsk = tipc_sk_get_next(&ref)) { -		lock_sock(&tsk->sk); -		err = __tipc_nl_add_sk(skb, cb, tsk); -		release_sock(&tsk->sk); -		tipc_sk_put(tsk); -		if (err) -			break; +	const struct bucket_table *tbl; +	struct rhash_head *pos; +	struct net *net = sock_net(skb->sk); +	struct tipc_net *tn = net_generic(net, tipc_net_id); +	u32 tbl_id = cb->args[0]; +	u32 prev_portid = cb->args[1]; -		prev_ref = ref; -	} +	rcu_read_lock(); +	tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht); +	for (; tbl_id < tbl->size; tbl_id++) { +		rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) { +			spin_lock_bh(&tsk->sk.sk_lock.slock); +			if (prev_portid && prev_portid != tsk->portid) { +				spin_unlock_bh(&tsk->sk.sk_lock.slock); +				continue; +			} -	cb->args[0] = prev_ref; +			err = __tipc_nl_add_sk(skb, cb, tsk); +			if (err) { +				prev_portid = tsk->portid; +				spin_unlock_bh(&tsk->sk.sk_lock.slock); +				goto out; +			} +			prev_portid = 0; +			spin_unlock_bh(&tsk->sk.sk_lock.slock); +		} +	} +out: +	rcu_read_unlock(); +	cb->args[0] = tbl_id; +	cb->args[1] = prev_portid;  	return skb->len;  } @@ -2888,7 +2778,7 @@ static int __tipc_nl_add_sk_publ(struct sk_buff *skb,  	struct nlattr *attrs;  	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq, -			  &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET); +			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);  	if (!hdr)  		goto msg_cancel; @@ -2962,12 +2852,13 @@ static int __tipc_nl_list_sk_publ(struct sk_buff *skb,  int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)  {  	int err; -	u32 tsk_ref = cb->args[0]; +	u32 tsk_portid = cb->args[0];  	u32 last_publ = cb->args[1];  	u32 done = cb->args[2]; +	struct net *net = sock_net(skb->sk);  	struct tipc_sock *tsk; -	if (!tsk_ref) { +	if (!tsk_portid) {  		struct nlattr **attrs;  		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1]; @@ -2984,13 +2875,13 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)  		if (!sock[TIPC_NLA_SOCK_REF])  			return -EINVAL; -		tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]); +		tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);  	}  	if (done)  		return 0; -	tsk = tipc_sk_get(tsk_ref); +	tsk = tipc_sk_lookup(net, tsk_portid);  	if (!tsk)  		return -EINVAL; @@ -2999,9 +2890,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)  	if (!err)  		done = 1;  	release_sock(&tsk->sk); -	tipc_sk_put(tsk); +	sock_put(&tsk->sk); -	cb->args[0] = tsk_ref; +	cb->args[0] = tsk_portid;  	cb->args[1] = last_publ;  	cb->args[2] = done; |