diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/Kconfig | 8 | ||||
| -rw-r--r-- | net/tipc/Makefile | 1 | ||||
| -rw-r--r-- | net/tipc/addr.c | 7 | ||||
| -rw-r--r-- | net/tipc/addr.h | 1 | ||||
| -rw-r--r-- | net/tipc/bcast.c | 95 | ||||
| -rw-r--r-- | net/tipc/bcast.h | 4 | ||||
| -rw-r--r-- | net/tipc/bearer.c | 13 | ||||
| -rw-r--r-- | net/tipc/bearer.h | 17 | ||||
| -rw-r--r-- | net/tipc/discover.c | 11 | ||||
| -rw-r--r-- | net/tipc/eth_media.c | 8 | ||||
| -rw-r--r-- | net/tipc/ib_media.c | 2 | ||||
| -rw-r--r-- | net/tipc/link.c | 862 | ||||
| -rw-r--r-- | net/tipc/link.h | 51 | ||||
| -rw-r--r-- | net/tipc/msg.c | 130 | ||||
| -rw-r--r-- | net/tipc/msg.h | 133 | ||||
| -rw-r--r-- | net/tipc/name_distr.c | 4 | ||||
| -rw-r--r-- | net/tipc/name_table.c | 4 | ||||
| -rw-r--r-- | net/tipc/node.c | 116 | ||||
| -rw-r--r-- | net/tipc/node.h | 18 | ||||
| -rw-r--r-- | net/tipc/server.c | 47 | ||||
| -rw-r--r-- | net/tipc/socket.c | 272 | ||||
| -rw-r--r-- | net/tipc/socket.h | 4 | ||||
| -rw-r--r-- | net/tipc/subscr.c | 23 | ||||
| -rw-r--r-- | net/tipc/udp_media.c | 448 | 
24 files changed, 1319 insertions, 960 deletions
| diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig index 91c8a8e031db..c25a3a149dc4 100644 --- a/net/tipc/Kconfig +++ b/net/tipc/Kconfig @@ -26,3 +26,11 @@ config TIPC_MEDIA_IB  	help  	  Saying Y here will enable support for running TIPC on  	  IP-over-InfiniBand devices. +config TIPC_MEDIA_UDP +	bool "IP/UDP media type support" +	depends on TIPC +	select NET_UDP_TUNNEL +	help +	  Saying Y here will enable support for running TIPC over IP/UDP +	bool +	default y diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 599b1a540d2b..57e460be4692 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -10,5 +10,6 @@ tipc-y	+= addr.o bcast.o bearer.o \  	   netlink.o netlink_compat.o node.o socket.o eth_media.o \  	   server.o socket.o +tipc-$(CONFIG_TIPC_MEDIA_UDP)	+= udp_media.o  tipc-$(CONFIG_TIPC_MEDIA_IB)	+= ib_media.o  tipc-$(CONFIG_SYSCTL)		+= sysctl.o diff --git a/net/tipc/addr.c b/net/tipc/addr.c index 48fd3b5a73fb..ba7daa864d44 100644 --- a/net/tipc/addr.c +++ b/net/tipc/addr.c @@ -38,6 +38,13 @@  #include "addr.h"  #include "core.h" +u32 tipc_own_addr(struct net *net) +{ +	struct tipc_net *tn = net_generic(net, tipc_net_id); + +	return tn->own_addr; +} +  /**   * in_own_cluster - test for cluster inclusion; <0.0.0> always matches   */ diff --git a/net/tipc/addr.h b/net/tipc/addr.h index c700c2d28e09..7ba6d5c8ae40 100644 --- a/net/tipc/addr.h +++ b/net/tipc/addr.h @@ -55,6 +55,7 @@ static inline u32 tipc_cluster_mask(u32 addr)  	return addr & TIPC_CLUSTER_MASK;  } +u32 tipc_own_addr(struct net *net);  int in_own_cluster(struct net *net, u32 addr);  int in_own_cluster_exact(struct net *net, u32 addr);  int in_own_node(struct net *net, u32 addr); diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 3e41704832de..c5cbdcb1f0b5 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -62,21 +62,8 @@ static void tipc_bclink_lock(struct net *net)  static void tipc_bclink_unlock(struct net *net)  {  	struct tipc_net *tn = net_generic(net, tipc_net_id); -	struct tipc_node *node = NULL; -	if (likely(!tn->bclink->flags)) { -		spin_unlock_bh(&tn->bclink->lock); -		return; -	} - -	if (tn->bclink->flags & TIPC_BCLINK_RESET) { -		tn->bclink->flags &= ~TIPC_BCLINK_RESET; -		node = tipc_bclink_retransmit_to(net); -	}  	spin_unlock_bh(&tn->bclink->lock); - -	if (node) -		tipc_link_reset_all(node);  }  void tipc_bclink_input(struct net *net) @@ -91,13 +78,6 @@ uint  tipc_bclink_get_mtu(void)  	return MAX_PKT_DEFAULT_MCAST;  } -void tipc_bclink_set_flags(struct net *net, unsigned int flags) -{ -	struct tipc_net *tn = net_generic(net, tipc_net_id); - -	tn->bclink->flags |= flags; -} -  static u32 bcbuf_acks(struct sk_buff *buf)  {  	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; @@ -135,9 +115,10 @@ static void bclink_set_last_sent(struct net *net)  {  	struct tipc_net *tn = net_generic(net, tipc_net_id);  	struct tipc_link *bcl = tn->bcl; +	struct sk_buff *skb = skb_peek(&bcl->backlogq); -	if (bcl->next_out) -		bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); +	if (skb) +		bcl->fsm_msg_cnt = mod(buf_seqno(skb) - 1);  	else  		bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);  } @@ -155,7 +136,6 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)  						seqno : node->bclink.last_sent;  } -  /**   * tipc_bclink_retransmit_to - get most recent node to request retransmission   * @@ -180,7 +160,7 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)  	struct sk_buff *skb;  	struct tipc_link *bcl = tn->bcl; -	skb_queue_walk(&bcl->outqueue, skb) { +	skb_queue_walk(&bcl->transmq, skb) {  		if (more(buf_seqno(skb), after)) {  			tipc_link_retransmit(bcl, skb, mod(to - after));  			break; @@ -210,14 +190,17 @@ void tipc_bclink_wakeup_users(struct net *net)  void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)  {  	struct sk_buff *skb, *tmp; -	struct sk_buff *next;  	unsigned int released = 0;  	struct net *net = n_ptr->net;  	struct tipc_net *tn = net_generic(net, tipc_net_id); +	if (unlikely(!n_ptr->bclink.recv_permitted)) +		return; +  	tipc_bclink_lock(net); +  	/* Bail out if tx queue is empty (no clean up is required) */ -	skb = skb_peek(&tn->bcl->outqueue); +	skb = skb_peek(&tn->bcl->transmq);  	if (!skb)  		goto exit; @@ -244,27 +227,19 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)  	}  	/* Skip over packets that node has previously acknowledged */ -	skb_queue_walk(&tn->bcl->outqueue, skb) { +	skb_queue_walk(&tn->bcl->transmq, skb) {  		if (more(buf_seqno(skb), n_ptr->bclink.acked))  			break;  	}  	/* Update packets that node is now acknowledging */ -	skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) { +	skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {  		if (more(buf_seqno(skb), acked))  			break; - -		next = tipc_skb_queue_next(&tn->bcl->outqueue, skb); -		if (skb != tn->bcl->next_out) { -			bcbuf_decr_acks(skb); -		} else { -			bcbuf_set_acks(skb, 0); -			tn->bcl->next_out = next; -			bclink_set_last_sent(net); -		} - +		bcbuf_decr_acks(skb); +		bclink_set_last_sent(net);  		if (bcbuf_acks(skb) == 0) { -			__skb_unlink(skb, &tn->bcl->outqueue); +			__skb_unlink(skb, &tn->bcl->transmq);  			kfree_skb(skb);  			released = 1;  		} @@ -272,7 +247,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)  	n_ptr->bclink.acked = acked;  	/* Try resolving broadcast link congestion, if necessary */ -	if (unlikely(tn->bcl->next_out)) { +	if (unlikely(skb_peek(&tn->bcl->backlogq))) {  		tipc_link_push_packets(tn->bcl);  		bclink_set_last_sent(net);  	} @@ -319,7 +294,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,  	buf = tipc_buf_acquire(INT_H_SIZE);  	if (buf) {  		struct tipc_msg *msg = buf_msg(buf); -		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); +		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);  		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;  		tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, @@ -354,13 +329,12 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)  		return;  	tipc_node_lock(n_ptr); -  	if (n_ptr->bclink.recv_permitted &&  	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&  	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))  		n_ptr->bclink.oos_state = 2; -  	tipc_node_unlock(n_ptr); +	tipc_node_put(n_ptr);  }  /* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster @@ -387,14 +361,13 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)  		__skb_queue_purge(list);  		return -EHOSTUNREACH;  	} -  	/* Broadcast to all nodes */  	if (likely(bclink)) {  		tipc_bclink_lock(net);  		if (likely(bclink->bcast_nodes.count)) {  			rc = __tipc_link_xmit(net, bcl, list);  			if (likely(!rc)) { -				u32 len = skb_queue_len(&bcl->outqueue); +				u32 len = skb_queue_len(&bcl->transmq);  				bclink_set_last_sent(net);  				bcl->stats.queue_sz_counts++; @@ -440,7 +413,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)  	 */  	if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {  		tipc_link_proto_xmit(node->active_links[node->addr & 1], -				     STATE_MSG, 0, 0, 0, 0, 0); +				     STATE_MSG, 0, 0, 0, 0);  		tn->bcl->stats.sent_acks++;  	}  } @@ -481,17 +454,18 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)  			goto unlock;  		if (msg_destnode(msg) == tn->own_addr) {  			tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); -			tipc_node_unlock(node);  			tipc_bclink_lock(net);  			bcl->stats.recv_nacks++;  			tn->bclink->retransmit_to = node;  			bclink_retransmit_pkt(tn, msg_bcgap_after(msg),  					      msg_bcgap_to(msg));  			tipc_bclink_unlock(net); +			tipc_node_unlock(node);  		} else {  			tipc_node_unlock(node);  			bclink_peek_nack(net, msg);  		} +		tipc_node_put(node);  		goto exit;  	} @@ -528,11 +502,13 @@ receive:  			tipc_bclink_unlock(net);  			tipc_node_unlock(node);  		} else if (msg_user(msg) == MSG_FRAGMENTER) { -			tipc_buf_append(&node->bclink.reasm_buf, &buf); -			if (unlikely(!buf && !node->bclink.reasm_buf)) -				goto unlock;  			tipc_bclink_lock(net);  			bclink_accept_pkt(node, seqno); +			tipc_buf_append(&node->bclink.reasm_buf, &buf); +			if (unlikely(!buf && !node->bclink.reasm_buf)) { +				tipc_bclink_unlock(net); +				goto unlock; +			}  			bcl->stats.recv_fragments++;  			if (buf) {  				bcl->stats.recv_fragmented++; @@ -559,25 +535,25 @@ receive:  		if (node->bclink.last_in == node->bclink.last_sent)  			goto unlock; -		if (skb_queue_empty(&node->bclink.deferred_queue)) { +		if (skb_queue_empty(&node->bclink.deferdq)) {  			node->bclink.oos_state = 1;  			goto unlock;  		} -		msg = buf_msg(skb_peek(&node->bclink.deferred_queue)); +		msg = buf_msg(skb_peek(&node->bclink.deferdq));  		seqno = msg_seqno(msg);  		next_in = mod(next_in + 1);  		if (seqno != next_in)  			goto unlock;  		/* Take in-sequence message from deferred queue & deliver it */ -		buf = __skb_dequeue(&node->bclink.deferred_queue); +		buf = __skb_dequeue(&node->bclink.deferdq);  		goto receive;  	}  	/* Handle out-of-sequence broadcast message */  	if (less(next_in, seqno)) { -		deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue, +		deferred = tipc_link_defer_pkt(&node->bclink.deferdq,  					       buf);  		bclink_update_last_sent(node, seqno);  		buf = NULL; @@ -594,6 +570,7 @@ receive:  unlock:  	tipc_node_unlock(node); +	tipc_node_put(node);  exit:  	kfree_skb(buf);  } @@ -634,7 +611,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,  		msg_set_non_seq(msg, 1);  		msg_set_mc_netid(msg, tn->net_id);  		tn->bcl->stats.sent_info++; -  		if (WARN_ON(!bclink->bcast_nodes.count)) {  			dump_stack();  			return 0; @@ -835,7 +811,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)  	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);  	if (!prop)  		goto attr_msg_full; -	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0])) +	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))  		goto prop_msg_full;  	nla_nest_end(msg->skb, prop); @@ -913,8 +889,9 @@ int tipc_bclink_init(struct net *net)  	sprintf(bcbearer->media.name, "tipc-broadcast");  	spin_lock_init(&bclink->lock); -	__skb_queue_head_init(&bcl->outqueue); -	__skb_queue_head_init(&bcl->deferred_queue); +	__skb_queue_head_init(&bcl->transmq); +	__skb_queue_head_init(&bcl->backlogq); +	__skb_queue_head_init(&bcl->deferdq);  	skb_queue_head_init(&bcl->wakeupq);  	bcl->next_out_no = 1;  	spin_lock_init(&bclink->node.lock); @@ -922,7 +899,7 @@ int tipc_bclink_init(struct net *net)  	skb_queue_head_init(&bclink->inputq);  	bcl->owner = &bclink->node;  	bcl->owner->net = net; -	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; +	bcl->mtu = MAX_PKT_DEFAULT_MCAST;  	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);  	bcl->bearer_id = MAX_BEARERS;  	rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 43f397fbac55..4bdc12277d33 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -55,7 +55,6 @@ struct tipc_bcbearer_pair {  	struct tipc_bearer *secondary;  }; -#define TIPC_BCLINK_RESET	1  #define	BCBEARER		MAX_BEARERS  /** @@ -86,7 +85,6 @@ struct tipc_bcbearer {   * @lock: spinlock governing access to structure   * @link: (non-standard) broadcast link structure   * @node: (non-standard) node structure representing b'cast link's peer node - * @flags: represent bclink states   * @bcast_nodes: map of broadcast-capable nodes   * @retransmit_to: node that most recently requested a retransmit   * @@ -96,7 +94,6 @@ struct tipc_bclink {  	spinlock_t lock;  	struct tipc_link link;  	struct tipc_node node; -	unsigned int flags;  	struct sk_buff_head arrvq;  	struct sk_buff_head inputq;  	struct tipc_node_map bcast_nodes; @@ -117,7 +114,6 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,  int tipc_bclink_init(struct net *net);  void tipc_bclink_stop(struct net *net); -void tipc_bclink_set_flags(struct net *tn, unsigned int flags);  void tipc_bclink_add_node(struct net *net, u32 addr);  void tipc_bclink_remove_node(struct net *net, u32 addr);  struct tipc_node *tipc_bclink_retransmit_to(struct net *tn); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 48852c2dcc03..3613e72e858e 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -48,6 +48,9 @@ static struct tipc_media * const media_info_array[] = {  #ifdef CONFIG_TIPC_MEDIA_IB  	&ib_media_info,  #endif +#ifdef CONFIG_TIPC_MEDIA_UDP +	&udp_media_info, +#endif  	NULL  }; @@ -216,7 +219,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest)   * tipc_enable_bearer - enable bearer with the given name   */  static int tipc_enable_bearer(struct net *net, const char *name, -			      u32 disc_domain, u32 priority) +			      u32 disc_domain, u32 priority, +			      struct nlattr *attr[])  {  	struct tipc_net *tn = net_generic(net, tipc_net_id);  	struct tipc_bearer *b_ptr; @@ -304,7 +308,7 @@ restart:  	strcpy(b_ptr->name, name);  	b_ptr->media = m_ptr; -	res = m_ptr->enable_media(net, b_ptr); +	res = m_ptr->enable_media(net, b_ptr, attr);  	if (res) {  		pr_warn("Bearer <%s> rejected, enable failure (%d)\n",  			name, -res); @@ -372,7 +376,8 @@ static void bearer_disable(struct net *net, struct tipc_bearer *b_ptr,  	kfree_rcu(b_ptr, rcu);  } -int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b) +int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b, +			 struct nlattr *attr[])  {  	struct net_device *dev;  	char *driver_name = strchr((const char *)b->name, ':') + 1; @@ -791,7 +796,7 @@ int tipc_nl_bearer_enable(struct sk_buff *skb, struct genl_info *info)  	}  	rtnl_lock(); -	err = tipc_enable_bearer(net, bearer, domain, prio); +	err = tipc_enable_bearer(net, bearer, domain, prio, attrs);  	if (err) {  		rtnl_unlock();  		return err; diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 6b17795ff8bc..5cad243ee8fc 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -41,7 +41,7 @@  #include <net/genetlink.h>  #define MAX_BEARERS	2 -#define MAX_MEDIA	2 +#define MAX_MEDIA	3  #define MAX_NODES	4096  #define WSIZE		32 @@ -50,14 +50,16 @@   * - the field's actual content and length is defined per media   * - remaining unused bytes in the field are set to zero   */ -#define TIPC_MEDIA_ADDR_SIZE	32 +#define TIPC_MEDIA_INFO_SIZE	32  #define TIPC_MEDIA_TYPE_OFFSET	3 +#define TIPC_MEDIA_ADDR_OFFSET	4  /*   * Identifiers of supported TIPC media types   */  #define TIPC_MEDIA_TYPE_ETH	1  #define TIPC_MEDIA_TYPE_IB	2 +#define TIPC_MEDIA_TYPE_UDP	3  /**   * struct tipc_node_map - set of node identifiers @@ -76,7 +78,7 @@ struct tipc_node_map {   * @broadcast: non-zero if address is a broadcast address   */  struct tipc_media_addr { -	u8 value[TIPC_MEDIA_ADDR_SIZE]; +	u8 value[TIPC_MEDIA_INFO_SIZE];  	u8 media_id;  	u8 broadcast;  }; @@ -103,7 +105,8 @@ struct tipc_media {  	int (*send_msg)(struct net *net, struct sk_buff *buf,  			struct tipc_bearer *b_ptr,  			struct tipc_media_addr *dest); -	int (*enable_media)(struct net *net, struct tipc_bearer *b_ptr); +	int (*enable_media)(struct net *net, struct tipc_bearer *b_ptr, +			    struct nlattr *attr[]);  	void (*disable_media)(struct tipc_bearer *b_ptr);  	int (*addr2str)(struct tipc_media_addr *addr,  			char *strbuf, @@ -182,6 +185,9 @@ extern struct tipc_media eth_media_info;  #ifdef CONFIG_TIPC_MEDIA_IB  extern struct tipc_media ib_media_info;  #endif +#ifdef CONFIG_TIPC_MEDIA_UDP +extern struct tipc_media udp_media_info; +#endif  int tipc_nl_bearer_disable(struct sk_buff *skb, struct genl_info *info);  int tipc_nl_bearer_enable(struct sk_buff *skb, struct genl_info *info); @@ -196,7 +202,8 @@ int tipc_nl_media_set(struct sk_buff *skb, struct genl_info *info);  int tipc_media_set_priority(const char *name, u32 new_value);  int tipc_media_set_window(const char *name, u32 new_value);  void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a); -int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b); +int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b, +			 struct nlattr *attrs[]);  void tipc_disable_l2_media(struct tipc_bearer *b);  int tipc_l2_send_msg(struct net *net, struct sk_buff *buf,  		     struct tipc_bearer *b, struct tipc_media_addr *dest); diff --git a/net/tipc/discover.c b/net/tipc/discover.c index feef3753615d..967e292f53c8 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -86,9 +86,10 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,  	msg = buf_msg(buf);  	tipc_msg_init(tn->own_addr, msg, LINK_CONFIG, type, -		      INT_H_SIZE, dest_domain); +		      MAX_H_SIZE, dest_domain);  	msg_set_non_seq(msg, 1);  	msg_set_node_sig(msg, tn->random); +	msg_set_node_capabilities(msg, 0);  	msg_set_dest_domain(msg, dest_domain);  	msg_set_bc_netid(msg, tn->net_id);  	b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr); @@ -133,6 +134,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,  	u32 net_id = msg_bc_netid(msg);  	u32 mtyp = msg_type(msg);  	u32 signature = msg_node_sig(msg); +	u16 caps = msg_node_capabilities(msg);  	bool addr_match = false;  	bool sign_match = false;  	bool link_up = false; @@ -167,6 +169,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,  	if (!node)  		return;  	tipc_node_lock(node); +	node->capabilities = caps;  	link = node->links[bearer->identity];  	/* Prepare to validate requesting node's signature and media address */ @@ -249,7 +252,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,  	/* Send response, if necessary */  	if (respond && (mtyp == DSC_REQ_MSG)) { -		rbuf = tipc_buf_acquire(INT_H_SIZE); +		rbuf = tipc_buf_acquire(MAX_H_SIZE);  		if (rbuf) {  			tipc_disc_init_msg(net, rbuf, DSC_RESP_MSG, bearer);  			tipc_bearer_send(net, bearer->identity, rbuf, &maddr); @@ -257,6 +260,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,  		}  	}  	tipc_node_unlock(node); +	tipc_node_put(node);  }  /** @@ -359,8 +363,7 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr,  	req = kmalloc(sizeof(*req), GFP_ATOMIC);  	if (!req)  		return -ENOMEM; - -	req->buf = tipc_buf_acquire(INT_H_SIZE); +	req->buf = tipc_buf_acquire(MAX_H_SIZE);  	if (!req->buf) {  		kfree(req);  		return -ENOMEM; diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c index 5e1426f1751f..f69a2fde9f4a 100644 --- a/net/tipc/eth_media.c +++ b/net/tipc/eth_media.c @@ -37,8 +37,6 @@  #include "core.h"  #include "bearer.h" -#define ETH_ADDR_OFFSET  4  /* MAC addr position inside address field */ -  /* Convert Ethernet address (media address format) to string */  static int tipc_eth_addr2str(struct tipc_media_addr *addr,  			     char *strbuf, int bufsz) @@ -53,9 +51,9 @@ static int tipc_eth_addr2str(struct tipc_media_addr *addr,  /* Convert from media address format to discovery message addr format */  static int tipc_eth_addr2msg(char *msg, struct tipc_media_addr *addr)  { -	memset(msg, 0, TIPC_MEDIA_ADDR_SIZE); +	memset(msg, 0, TIPC_MEDIA_INFO_SIZE);  	msg[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH; -	memcpy(msg + ETH_ADDR_OFFSET, addr->value, ETH_ALEN); +	memcpy(msg + TIPC_MEDIA_ADDR_OFFSET, addr->value, ETH_ALEN);  	return 0;  } @@ -79,7 +77,7 @@ static int tipc_eth_msg2addr(struct tipc_bearer *b,  			     char *msg)  {  	/* Skip past preamble: */ -	msg += ETH_ADDR_OFFSET; +	msg += TIPC_MEDIA_ADDR_OFFSET;  	return tipc_eth_raw2addr(b, addr, msg);  } diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c index 8522eef9c136..e8c16718e3fa 100644 --- a/net/tipc/ib_media.c +++ b/net/tipc/ib_media.c @@ -57,7 +57,7 @@ static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf,  /* Convert from media address format to discovery message addr format */  static int tipc_ib_addr2msg(char *msg, struct tipc_media_addr *addr)  { -	memset(msg, 0, TIPC_MEDIA_ADDR_SIZE); +	memset(msg, 0, TIPC_MEDIA_INFO_SIZE);  	memcpy(msg, addr->value, INFINIBAND_ALEN);  	return 0;  } diff --git a/net/tipc/link.c b/net/tipc/link.c index 14f09b3cb87c..a6b30df6ec02 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@  /*   * net/tipc/link.c: TIPC link code   * - * Copyright (c) 1996-2007, 2012-2014, Ericsson AB + * Copyright (c) 1996-2007, 2012-2015, Ericsson AB   * Copyright (c) 2004-2007, 2010-2013, Wind River Systems   * All rights reserved.   * @@ -35,6 +35,7 @@   */  #include "core.h" +#include "subscr.h"  #include "link.h"  #include "bcast.h"  #include "socket.h" @@ -88,24 +89,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {  #define  TIMEOUT_EVT     560817u	/* link timer expired */  /* - * The following two 'message types' is really just implementation - * data conveniently stored in the message header. - * They must not be considered part of the protocol + * State value stored in 'failover_pkts'   */ -#define OPEN_MSG   0 -#define CLOSED_MSG 1 - -/* - * State value stored in 'exp_msg_count' - */ -#define START_CHANGEOVER 100000u +#define FIRST_FAILOVER 0xffffu  static void link_handle_out_of_seq_msg(struct tipc_link *link,  				       struct sk_buff *skb);  static void tipc_link_proto_rcv(struct tipc_link *link,  				struct sk_buff *skb); -static int  tipc_link_tunnel_rcv(struct tipc_node *node, -				 struct sk_buff **skb);  static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);  static void link_state_event(struct tipc_link *l_ptr, u32 event);  static void link_reset_statistics(struct tipc_link *l_ptr); @@ -114,7 +105,7 @@ static void tipc_link_sync_xmit(struct tipc_link *l);  static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);  static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);  static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); - +static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);  /*   *  Simple link routines   */ @@ -138,32 +129,11 @@ static void tipc_link_put(struct tipc_link *l_ptr)  	kref_put(&l_ptr->ref, tipc_link_release);  } -static void link_init_max_pkt(struct tipc_link *l_ptr) +static struct tipc_link *tipc_parallel_link(struct tipc_link *l)  { -	struct tipc_node *node = l_ptr->owner; -	struct tipc_net *tn = net_generic(node->net, tipc_net_id); -	struct tipc_bearer *b_ptr; -	u32 max_pkt; - -	rcu_read_lock(); -	b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]); -	if (!b_ptr) { -		rcu_read_unlock(); -		return; -	} -	max_pkt = (b_ptr->mtu & ~3); -	rcu_read_unlock(); - -	if (max_pkt > MAX_MSG_SIZE) -		max_pkt = MAX_MSG_SIZE; - -	l_ptr->max_pkt_target = max_pkt; -	if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT) -		l_ptr->max_pkt = l_ptr->max_pkt_target; -	else -		l_ptr->max_pkt = MAX_PKT_DEFAULT; - -	l_ptr->max_pkt_probes = 0; +	if (l->owner->active_links[0] != l) +		return l->owner->active_links[0]; +	return l->owner->active_links[1];  }  /* @@ -194,10 +164,10 @@ static void link_timeout(unsigned long data)  	tipc_node_lock(l_ptr->owner);  	/* update counters used in statistical profiling of send traffic */ -	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue); +	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);  	l_ptr->stats.queue_sz_counts++; -	skb = skb_peek(&l_ptr->outqueue); +	skb = skb_peek(&l_ptr->transmq);  	if (skb) {  		struct tipc_msg *msg = buf_msg(skb);  		u32 length = msg_size(msg); @@ -229,7 +199,7 @@ static void link_timeout(unsigned long data)  	/* do all other link processing performed on a periodic basis */  	link_state_event(l_ptr, TIMEOUT_EVT); -	if (l_ptr->next_out) +	if (skb_queue_len(&l_ptr->backlogq))  		tipc_link_push_packets(l_ptr);  	tipc_node_unlock(l_ptr->owner); @@ -305,16 +275,15 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,  	msg_set_session(msg, (tn->random & 0xffff));  	msg_set_bearer_id(msg, b_ptr->identity);  	strcpy((char *)msg_data(msg), if_name); - +	l_ptr->net_plane = b_ptr->net_plane; +	l_ptr->advertised_mtu = b_ptr->mtu; +	l_ptr->mtu = l_ptr->advertised_mtu;  	l_ptr->priority = b_ptr->priority;  	tipc_link_set_queue_limits(l_ptr, b_ptr->window); - -	l_ptr->net_plane = b_ptr->net_plane; -	link_init_max_pkt(l_ptr); -  	l_ptr->next_out_no = 1; -	__skb_queue_head_init(&l_ptr->outqueue); -	__skb_queue_head_init(&l_ptr->deferred_queue); +	__skb_queue_head_init(&l_ptr->transmq); +	__skb_queue_head_init(&l_ptr->backlogq); +	__skb_queue_head_init(&l_ptr->deferdq);  	skb_queue_head_init(&l_ptr->wakeupq);  	skb_queue_head_init(&l_ptr->inputq);  	skb_queue_head_init(&l_ptr->namedq); @@ -327,15 +296,19 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,  }  /** - * link_delete - Conditional deletion of link. - *               If timer still running, real delete is done when it expires - * @link: link to be deleted + * tipc_link_delete - Delete a link + * @l: link to be deleted   */ -void tipc_link_delete(struct tipc_link *link) +void tipc_link_delete(struct tipc_link *l)  { -	tipc_link_reset_fragments(link); -	tipc_node_detach_link(link->owner, link); -	tipc_link_put(link); +	tipc_link_reset(l); +	if (del_timer(&l->timer)) +		tipc_link_put(l); +	l->flags |= LINK_STOPPED; +	/* Delete link now, or when timer is finished: */ +	tipc_link_reset_fragments(l); +	tipc_node_detach_link(l->owner, l); +	tipc_link_put(l);  }  void tipc_link_delete_list(struct net *net, unsigned int bearer_id, @@ -349,16 +322,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id,  	list_for_each_entry_rcu(node, &tn->node_list, list) {  		tipc_node_lock(node);  		link = node->links[bearer_id]; -		if (!link) { -			tipc_node_unlock(node); -			continue; -		} -		tipc_link_reset(link); -		if (del_timer(&link->timer)) -			tipc_link_put(link); -		link->flags |= LINK_STOPPED; -		/* Delete link now, or when failover is finished: */ -		if (shutting_down || !tipc_node_is_up(node)) +		if (link)  			tipc_link_delete(link);  		tipc_node_unlock(node);  	} @@ -366,28 +330,43 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id,  }  /** - * link_schedule_user - schedule user for wakeup after congestion + * link_schedule_user - schedule a message sender for wakeup after congestion   * @link: congested link - * @oport: sending port - * @chain_sz: size of buffer chain that was attempted sent - * @imp: importance of message attempted sent + * @list: message that was attempted sent   * Create pseudo msg to send back to user when congestion abates + * Only consumes message if there is an error   */ -static bool link_schedule_user(struct tipc_link *link, u32 oport, -			       uint chain_sz, uint imp) +static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)  { -	struct sk_buff *buf; +	struct tipc_msg *msg = buf_msg(skb_peek(list)); +	int imp = msg_importance(msg); +	u32 oport = msg_origport(msg); +	u32 addr = link_own_addr(link); +	struct sk_buff *skb; -	buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, -			      link_own_addr(link), link_own_addr(link), -			      oport, 0, 0); -	if (!buf) -		return false; -	TIPC_SKB_CB(buf)->chain_sz = chain_sz; -	TIPC_SKB_CB(buf)->chain_imp = imp; -	skb_queue_tail(&link->wakeupq, buf); +	/* This really cannot happen...  */ +	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { +		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); +		tipc_link_reset(link); +		goto err; +	} +	/* Non-blocking sender: */ +	if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending) +		return -ELINKCONG; + +	/* Create and schedule wakeup pseudo message */ +	skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, +			      addr, addr, oport, 0, 0); +	if (!skb) +		goto err; +	TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); +	TIPC_SKB_CB(skb)->chain_imp = imp; +	skb_queue_tail(&link->wakeupq, skb);  	link->stats.link_congs++; -	return true; +	return -ELINKCONG; +err: +	__skb_queue_purge(list); +	return -ENOBUFS;  }  /** @@ -396,19 +375,22 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,   * Move a number of waiting users, as permitted by available space in   * the send queue, from link wait queue to node wait queue for wakeup   */ -void link_prepare_wakeup(struct tipc_link *link) +void link_prepare_wakeup(struct tipc_link *l)  { -	uint pend_qsz = skb_queue_len(&link->outqueue); +	int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; +	int imp, lim;  	struct sk_buff *skb, *tmp; -	skb_queue_walk_safe(&link->wakeupq, skb, tmp) { -		if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) +	skb_queue_walk_safe(&l->wakeupq, skb, tmp) { +		imp = TIPC_SKB_CB(skb)->chain_imp; +		lim = l->window + l->backlog[imp].limit; +		pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; +		if ((pnd[imp] + l->backlog[imp].len) >= lim)  			break; -		pend_qsz += TIPC_SKB_CB(skb)->chain_sz; -		skb_unlink(skb, &link->wakeupq); -		skb_queue_tail(&link->inputq, skb); -		link->owner->inputq = &link->inputq; -		link->owner->action_flags |= TIPC_MSG_EVT; +		skb_unlink(skb, &l->wakeupq); +		skb_queue_tail(&l->inputq, skb); +		l->owner->inputq = &l->inputq; +		l->owner->action_flags |= TIPC_MSG_EVT;  	}  } @@ -422,31 +404,42 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)  	l_ptr->reasm_buf = NULL;  } +static void tipc_link_purge_backlog(struct tipc_link *l) +{ +	__skb_queue_purge(&l->backlogq); +	l->backlog[TIPC_LOW_IMPORTANCE].len = 0; +	l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; +	l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; +	l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; +	l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; +} +  /**   * tipc_link_purge_queues - purge all pkt queues associated with link   * @l_ptr: pointer to link   */  void tipc_link_purge_queues(struct tipc_link *l_ptr)  { -	__skb_queue_purge(&l_ptr->deferred_queue); -	__skb_queue_purge(&l_ptr->outqueue); +	__skb_queue_purge(&l_ptr->deferdq); +	__skb_queue_purge(&l_ptr->transmq); +	tipc_link_purge_backlog(l_ptr);  	tipc_link_reset_fragments(l_ptr);  }  void tipc_link_reset(struct tipc_link *l_ptr)  {  	u32 prev_state = l_ptr->state; -	u32 checkpoint = l_ptr->next_in_no;  	int was_active_link = tipc_link_is_active(l_ptr);  	struct tipc_node *owner = l_ptr->owner; +	struct tipc_link *pl = tipc_parallel_link(l_ptr);  	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));  	/* Link is down, accept any session */  	l_ptr->peer_session = INVALID_SESSION; -	/* Prepare for max packet size negotiation */ -	link_init_max_pkt(l_ptr); +	/* Prepare for renewed mtu size negotiation */ +	l_ptr->mtu = l_ptr->advertised_mtu;  	l_ptr->state = RESET_UNKNOWN; @@ -456,21 +449,26 @@ void tipc_link_reset(struct tipc_link *l_ptr)  	tipc_node_link_down(l_ptr->owner, l_ptr);  	tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr); -	if (was_active_link && tipc_node_active_links(l_ptr->owner)) { -		l_ptr->reset_checkpoint = checkpoint; -		l_ptr->exp_msg_count = START_CHANGEOVER; +	if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) { +		l_ptr->flags |= LINK_FAILINGOVER; +		l_ptr->failover_checkpt = l_ptr->next_in_no; +		pl->failover_pkts = FIRST_FAILOVER; +		pl->failover_checkpt = l_ptr->next_in_no; +		pl->failover_skb = l_ptr->reasm_buf; +	} else { +		kfree_skb(l_ptr->reasm_buf);  	} -  	/* Clean up all queues, except inputq: */ -	__skb_queue_purge(&l_ptr->outqueue); -	__skb_queue_purge(&l_ptr->deferred_queue); +	__skb_queue_purge(&l_ptr->transmq); +	__skb_queue_purge(&l_ptr->deferdq);  	if (!owner->inputq)  		owner->inputq = &l_ptr->inputq;  	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);  	if (!skb_queue_empty(owner->inputq))  		owner->action_flags |= TIPC_MSG_EVT; -	l_ptr->next_out = NULL; -	l_ptr->unacked_window = 0; +	tipc_link_purge_backlog(l_ptr); +	l_ptr->reasm_buf = NULL; +	l_ptr->rcv_unacked = 0;  	l_ptr->checkpoint = 1;  	l_ptr->next_out_no = 1;  	l_ptr->fsm_msg_cnt = 0; @@ -521,8 +519,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  	if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))  		return;		/* Not yet. */ -	/* Check whether changeover is going on */ -	if (l_ptr->exp_msg_count) { +	if (l_ptr->flags & LINK_FAILINGOVER) {  		if (event == TIMEOUT_EVT)  			link_set_timer(l_ptr, cont_intv);  		return; @@ -539,11 +536,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  				l_ptr->checkpoint = l_ptr->next_in_no;  				if (tipc_bclink_acks_missing(l_ptr->owner)) {  					tipc_link_proto_xmit(l_ptr, STATE_MSG, -							     0, 0, 0, 0, 0); -					l_ptr->fsm_msg_cnt++; -				} else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { -					tipc_link_proto_xmit(l_ptr, STATE_MSG, -							     1, 0, 0, 0, 0); +							     0, 0, 0, 0);  					l_ptr->fsm_msg_cnt++;  				}  				link_set_timer(l_ptr, cont_intv); @@ -551,7 +544,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			}  			l_ptr->state = WORKING_UNKNOWN;  			l_ptr->fsm_msg_cnt = 0; -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv / 4);  			break; @@ -562,7 +555,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			l_ptr->state = RESET_RESET;  			l_ptr->fsm_msg_cnt = 0;  			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, -					     0, 0, 0, 0, 0); +					     0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -585,7 +578,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			l_ptr->state = RESET_RESET;  			l_ptr->fsm_msg_cnt = 0;  			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, -					     0, 0, 0, 0, 0); +					     0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -596,13 +589,13 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  				l_ptr->checkpoint = l_ptr->next_in_no;  				if (tipc_bclink_acks_missing(l_ptr->owner)) {  					tipc_link_proto_xmit(l_ptr, STATE_MSG, -							     0, 0, 0, 0, 0); +							     0, 0, 0, 0);  					l_ptr->fsm_msg_cnt++;  				}  				link_set_timer(l_ptr, cont_intv);  			} else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {  				tipc_link_proto_xmit(l_ptr, STATE_MSG, -						     1, 0, 0, 0, 0); +						     1, 0, 0, 0);  				l_ptr->fsm_msg_cnt++;  				link_set_timer(l_ptr, cont_intv / 4);  			} else {	/* Link has failed */ @@ -612,7 +605,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  				l_ptr->state = RESET_UNKNOWN;  				l_ptr->fsm_msg_cnt = 0;  				tipc_link_proto_xmit(l_ptr, RESET_MSG, -						     0, 0, 0, 0, 0); +						     0, 0, 0, 0);  				l_ptr->fsm_msg_cnt++;  				link_set_timer(l_ptr, cont_intv);  			} @@ -632,7 +625,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			l_ptr->state = WORKING_WORKING;  			l_ptr->fsm_msg_cnt = 0;  			link_activate(l_ptr); -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			if (l_ptr->owner->working_links == 1)  				tipc_link_sync_xmit(l_ptr); @@ -642,7 +635,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			l_ptr->state = RESET_RESET;  			l_ptr->fsm_msg_cnt = 0;  			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, -					     1, 0, 0, 0, 0); +					     1, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -652,7 +645,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			link_set_timer(l_ptr, cont_intv);  			break;  		case TIMEOUT_EVT: -			tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -670,7 +663,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			l_ptr->state = WORKING_WORKING;  			l_ptr->fsm_msg_cnt = 0;  			link_activate(l_ptr); -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			if (l_ptr->owner->working_links == 1)  				tipc_link_sync_xmit(l_ptr); @@ -680,7 +673,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			break;  		case TIMEOUT_EVT:  			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, -					     0, 0, 0, 0, 0); +					     0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -693,101 +686,65 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  	}  } -/* tipc_link_cong: determine return value and how to treat the - * sent buffer during link congestion. - * - For plain, errorless user data messages we keep the buffer and - *   return -ELINKONG. - * - For all other messages we discard the buffer and return -EHOSTUNREACH - * - For TIPC internal messages we also reset the link - */ -static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list) -{ -	struct sk_buff *skb = skb_peek(list); -	struct tipc_msg *msg = buf_msg(skb); -	uint imp = tipc_msg_tot_importance(msg); -	u32 oport = msg_tot_origport(msg); - -	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { -		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); -		tipc_link_reset(link); -		goto drop; -	} -	if (unlikely(msg_errcode(msg))) -		goto drop; -	if (unlikely(msg_reroute_cnt(msg))) -		goto drop; -	if (TIPC_SKB_CB(skb)->wakeup_pending) -		return -ELINKCONG; -	if (link_schedule_user(link, oport, skb_queue_len(list), imp)) -		return -ELINKCONG; -drop: -	__skb_queue_purge(list); -	return -EHOSTUNREACH; -} -  /**   * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked   * @link: link to use   * @list: chain of buffers containing message   * - * Consumes the buffer chain, except when returning -ELINKCONG - * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket - * user data messages) or -EHOSTUNREACH (all other messages/senders) - * Only the socket functions tipc_send_stream() and tipc_send_packet() need - * to act on the return value, since they may need to do more send attempts. + * Consumes the buffer chain, except when returning -ELINKCONG, + * since the caller then may want to make more send attempts. + * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS + * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted   */  int __tipc_link_xmit(struct net *net, struct tipc_link *link,  		     struct sk_buff_head *list)  {  	struct tipc_msg *msg = buf_msg(skb_peek(list)); -	uint psz = msg_size(msg); -	uint sndlim = link->queue_limit[0]; -	uint imp = tipc_msg_tot_importance(msg); -	uint mtu = link->max_pkt; +	unsigned int maxwin = link->window; +	unsigned int imp = msg_importance(msg); +	uint mtu = link->mtu;  	uint ack = mod(link->next_in_no - 1);  	uint seqno = link->next_out_no;  	uint bc_last_in = link->owner->bclink.last_in;  	struct tipc_media_addr *addr = &link->media_addr; -	struct sk_buff_head *outqueue = &link->outqueue; +	struct sk_buff_head *transmq = &link->transmq; +	struct sk_buff_head *backlogq = &link->backlogq;  	struct sk_buff *skb, *tmp; -	/* Match queue limits against msg importance: */ -	if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) -		return tipc_link_cong(link, list); +	/* Match backlog limit against msg importance: */ +	if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit)) +		return link_schedule_user(link, list); -	/* Has valid packet limit been used ? */ -	if (unlikely(psz > mtu)) { +	if (unlikely(msg_size(msg) > mtu)) {  		__skb_queue_purge(list);  		return -EMSGSIZE;  	} - -	/* Prepare each packet for sending, and add to outqueue: */ +	/* Prepare each packet for sending, and add to relevant queue: */  	skb_queue_walk_safe(list, skb, tmp) {  		__skb_unlink(skb, list);  		msg = buf_msg(skb); -		msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); +		msg_set_seqno(msg, seqno); +		msg_set_ack(msg, ack);  		msg_set_bcast_ack(msg, bc_last_in); -		if (skb_queue_len(outqueue) < sndlim) { -			__skb_queue_tail(outqueue, skb); -			tipc_bearer_send(net, link->bearer_id, -					 skb, addr); -			link->next_out = NULL; -			link->unacked_window = 0; -		} else if (tipc_msg_bundle(outqueue, skb, mtu)) { +		if (likely(skb_queue_len(transmq) < maxwin)) { +			__skb_queue_tail(transmq, skb); +			tipc_bearer_send(net, link->bearer_id, skb, addr); +			link->rcv_unacked = 0; +			seqno++; +			continue; +		} +		if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {  			link->stats.sent_bundled++;  			continue; -		} else if (tipc_msg_make_bundle(outqueue, skb, mtu, -						link->addr)) { +		} +		if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {  			link->stats.sent_bundled++;  			link->stats.sent_bundles++; -			if (!link->next_out) -				link->next_out = skb_peek_tail(outqueue); -		} else { -			__skb_queue_tail(outqueue, skb); -			if (!link->next_out) -				link->next_out = skb; +			imp = msg_importance(buf_msg(skb));  		} +		__skb_queue_tail(backlogq, skb); +		link->backlog[imp].len++;  		seqno++;  	}  	link->next_out_no = seqno; @@ -808,13 +765,25 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)  	return __tipc_link_xmit(link->owner->net, link, &head);  } +/* tipc_link_xmit_skb(): send single buffer to destination + * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE + * messages, which will not be rejected + * The only exception is datagram messages rerouted after secondary + * lookup, which are rare and safe to dispose of anyway. + * TODO: Return real return value, and let callers use + * tipc_wait_for_sendpkt() where applicable + */  int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,  		       u32 selector)  {  	struct sk_buff_head head; +	int rc;  	skb2list(skb, &head); -	return tipc_link_xmit(net, &head, dnode, selector); +	rc = tipc_link_xmit(net, &head, dnode, selector); +	if (rc == -ELINKCONG) +		kfree_skb(skb); +	return 0;  }  /** @@ -841,12 +810,15 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,  		if (link)  			rc = __tipc_link_xmit(net, link, list);  		tipc_node_unlock(node); +		tipc_node_put(node);  	}  	if (link)  		return rc; -	if (likely(in_own_node(net, dnode))) -		return tipc_sk_rcv(net, list); +	if (likely(in_own_node(net, dnode))) { +		tipc_sk_rcv(net, list); +		return 0; +	}  	__skb_queue_purge(list);  	return rc; @@ -893,14 +865,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)  	kfree_skb(buf);  } -struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list, -				    const struct sk_buff *skb) -{ -	if (skb_queue_is_last(list, skb)) -		return NULL; -	return skb->next; -} -  /*   * tipc_link_push_packets - push unsent packets to bearer   * @@ -909,30 +873,24 @@ struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,   *   * Called with node locked   */ -void tipc_link_push_packets(struct tipc_link *l_ptr) +void tipc_link_push_packets(struct tipc_link *link)  { -	struct sk_buff_head *outqueue = &l_ptr->outqueue; -	struct sk_buff *skb = l_ptr->next_out; +	struct sk_buff *skb;  	struct tipc_msg *msg; -	u32 next, first; +	unsigned int ack = mod(link->next_in_no - 1); -	skb_queue_walk_from(outqueue, skb) { -		msg = buf_msg(skb); -		next = msg_seqno(msg); -		first = buf_seqno(skb_peek(outqueue)); - -		if (mod(next - first) < l_ptr->queue_limit[0]) { -			msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); -			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); -			if (msg_user(msg) == MSG_BUNDLER) -				TIPC_SKB_CB(skb)->bundling = false; -			tipc_bearer_send(l_ptr->owner->net, -					 l_ptr->bearer_id, skb, -					 &l_ptr->media_addr); -			l_ptr->next_out = tipc_skb_queue_next(outqueue, skb); -		} else { +	while (skb_queue_len(&link->transmq) < link->window) { +		skb = __skb_dequeue(&link->backlogq); +		if (!skb)  			break; -		} +		msg = buf_msg(skb); +		link->backlog[msg_importance(msg)].len--; +		msg_set_ack(msg, ack); +		msg_set_bcast_ack(msg, link->owner->bclink.last_in); +		link->rcv_unacked = 0; +		__skb_queue_tail(&link->transmq, skb); +		tipc_bearer_send(link->owner->net, link->bearer_id, +				 skb, &link->media_addr);  	}  } @@ -979,7 +937,6 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,  			(unsigned long) TIPC_SKB_CB(buf)->handle);  		n_ptr = tipc_bclink_retransmit_to(net); -		tipc_node_lock(n_ptr);  		tipc_addr_string_fill(addr_string, n_ptr->addr);  		pr_info("Broadcast link info for %s\n", addr_string); @@ -991,9 +948,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,  			n_ptr->bclink.oos_state,  			n_ptr->bclink.last_sent); -		tipc_node_unlock(n_ptr); - -		tipc_bclink_set_flags(net, TIPC_BCLINK_RESET); +		n_ptr->action_flags |= TIPC_BCAST_RESET;  		l_ptr->stale_count = 0;  	}  } @@ -1019,8 +974,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,  		l_ptr->stale_count = 1;  	} -	skb_queue_walk_from(&l_ptr->outqueue, skb) { -		if (!retransmits || skb == l_ptr->next_out) +	skb_queue_walk_from(&l_ptr->transmq, skb) { +		if (!retransmits)  			break;  		msg = buf_msg(skb);  		msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); @@ -1032,72 +987,43 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,  	}  } -static void link_retrieve_defq(struct tipc_link *link, -			       struct sk_buff_head *list) -{ -	u32 seq_no; - -	if (skb_queue_empty(&link->deferred_queue)) -		return; - -	seq_no = buf_seqno(skb_peek(&link->deferred_queue)); -	if (seq_no == mod(link->next_in_no)) -		skb_queue_splice_tail_init(&link->deferred_queue, list); -} - -/** - * link_recv_buf_validate - validate basic format of received message - * - * This routine ensures a TIPC message has an acceptable header, and at least - * as much data as the header indicates it should.  The routine also ensures - * that the entire message header is stored in the main fragment of the message - * buffer, to simplify future access to message header fields. - * - * Note: Having extra info present in the message header or data areas is OK. - * TIPC will ignore the excess, under the assumption that it is optional info - * introduced by a later release of the protocol. +/* link_synch(): check if all packets arrived before the synch + *               point have been consumed + * Returns true if the parallel links are synched, otherwise false   */ -static int link_recv_buf_validate(struct sk_buff *buf) +static bool link_synch(struct tipc_link *l)  { -	static u32 min_data_hdr_size[8] = { -		SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE, -		MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE -		}; +	unsigned int post_synch; +	struct tipc_link *pl; -	struct tipc_msg *msg; -	u32 tipc_hdr[2]; -	u32 size; -	u32 hdr_size; -	u32 min_hdr_size; +	pl  = tipc_parallel_link(l); +	if (pl == l) +		goto synched; -	/* If this packet comes from the defer queue, the skb has already -	 * been validated -	 */ -	if (unlikely(TIPC_SKB_CB(buf)->deferred)) -		return 1; - -	if (unlikely(buf->len < MIN_H_SIZE)) -		return 0; - -	msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr); -	if (msg == NULL) -		return 0; +	/* Was last pre-synch packet added to input queue ? */ +	if (less_eq(pl->next_in_no, l->synch_point)) +		return false; -	if (unlikely(msg_version(msg) != TIPC_VERSION)) -		return 0; +	/* Is it still in the input queue ? */ +	post_synch = mod(pl->next_in_no - l->synch_point) - 1; +	if (skb_queue_len(&pl->inputq) > post_synch) +		return false; +synched: +	l->flags &= ~LINK_SYNCHING; +	return true; +} -	size = msg_size(msg); -	hdr_size = msg_hdr_sz(msg); -	min_hdr_size = msg_isdata(msg) ? -		min_data_hdr_size[msg_type(msg)] : INT_H_SIZE; +static void link_retrieve_defq(struct tipc_link *link, +			       struct sk_buff_head *list) +{ +	u32 seq_no; -	if (unlikely((hdr_size < min_hdr_size) || -		     (size < hdr_size) || -		     (buf->len < size) || -		     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE))) -		return 0; +	if (skb_queue_empty(&link->deferdq)) +		return; -	return pskb_may_pull(buf, hdr_size); +	seq_no = buf_seqno(skb_peek(&link->deferdq)); +	if (seq_no == mod(link->next_in_no)) +		skb_queue_splice_tail_init(&link->deferdq, list);  }  /** @@ -1125,16 +1051,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  	while ((skb = __skb_dequeue(&head))) {  		/* Ensure message is well-formed */ -		if (unlikely(!link_recv_buf_validate(skb))) -			goto discard; - -		/* Ensure message data is a single contiguous unit */ -		if (unlikely(skb_linearize(skb))) +		if (unlikely(!tipc_msg_validate(skb)))  			goto discard;  		/* Handle arrival of a non-unicast link message */  		msg = buf_msg(skb); -  		if (unlikely(msg_non_seq(msg))) {  			if (msg_user(msg) ==  LINK_CONFIG)  				tipc_disc_rcv(net, skb, b_ptr); @@ -1152,8 +1073,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  		n_ptr = tipc_node_find(net, msg_prevnode(msg));  		if (unlikely(!n_ptr))  			goto discard; -		tipc_node_lock(n_ptr); +		tipc_node_lock(n_ptr);  		/* Locate unicast link endpoint that should handle message */  		l_ptr = n_ptr->links[b_ptr->identity];  		if (unlikely(!l_ptr)) @@ -1175,21 +1096,20 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  		ackd = msg_ack(msg);  		/* Release acked messages */ -		if (n_ptr->bclink.recv_permitted) +		if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))  			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));  		released = 0; -		skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) { -			if (skb1 == l_ptr->next_out || -			    more(buf_seqno(skb1), ackd)) +		skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) { +			if (more(buf_seqno(skb1), ackd))  				break; -			 __skb_unlink(skb1, &l_ptr->outqueue); +			 __skb_unlink(skb1, &l_ptr->transmq);  			 kfree_skb(skb1);  			 released = 1;  		}  		/* Try sending any messages link endpoint has pending */ -		if (unlikely(l_ptr->next_out)) +		if (unlikely(skb_queue_len(&l_ptr->backlogq)))  			tipc_link_push_packets(l_ptr);  		if (released && !skb_queue_empty(&l_ptr->wakeupq)) @@ -1223,18 +1143,26 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)  			skb = NULL;  			goto unlock;  		} +		/* Synchronize with parallel link if applicable */ +		if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) { +			link_handle_out_of_seq_msg(l_ptr, skb); +			if (link_synch(l_ptr)) +				link_retrieve_defq(l_ptr, &head); +			skb = NULL; +			goto unlock; +		}  		l_ptr->next_in_no++; -		if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) +		if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))  			link_retrieve_defq(l_ptr, &head); - -		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { +		if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {  			l_ptr->stats.sent_acks++; -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);  		}  		tipc_link_input(l_ptr, skb);  		skb = NULL;  unlock:  		tipc_node_unlock(n_ptr); +		tipc_node_put(n_ptr);  discard:  		if (unlikely(skb))  			kfree_skb(skb); @@ -1271,7 +1199,7 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)  			node->action_flags |= TIPC_NAMED_MSG_EVT;  		return true;  	case MSG_BUNDLER: -	case CHANGEOVER_PROTOCOL: +	case TUNNEL_PROTOCOL:  	case MSG_FRAGMENTER:  	case BCAST_PROTOCOL:  		return false; @@ -1298,8 +1226,14 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)  		return;  	switch (msg_user(msg)) { -	case CHANGEOVER_PROTOCOL: -		if (!tipc_link_tunnel_rcv(node, &skb)) +	case TUNNEL_PROTOCOL: +		if (msg_dup(msg)) { +			link->flags |= LINK_SYNCHING; +			link->synch_point = msg_seqno(msg_get_wrapped(msg)); +			kfree_skb(skb); +			break; +		} +		if (!tipc_link_failover_rcv(link, &skb))  			break;  		if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {  			tipc_data_input(link, skb); @@ -1394,11 +1328,10 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,  		return;  	} -	if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) { +	if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {  		l_ptr->stats.deferred_recv++; -		TIPC_SKB_CB(buf)->deferred = true; -		if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1) -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); +		if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1) +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);  	} else {  		l_ptr->stats.duplicates++;  	} @@ -1408,15 +1341,15 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,   * Send protocol message to the other endpoint.   */  void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, -			  u32 gap, u32 tolerance, u32 priority, u32 ack_mtu) +			  u32 gap, u32 tolerance, u32 priority)  {  	struct sk_buff *buf = NULL;  	struct tipc_msg *msg = l_ptr->pmsg;  	u32 msg_size = sizeof(l_ptr->proto_msg);  	int r_flag; -	/* Don't send protocol message during link changeover */ -	if (l_ptr->exp_msg_count) +	/* Don't send protocol message during link failover */ +	if (l_ptr->flags & LINK_FAILINGOVER)  		return;  	/* Abort non-RESET send if communication with node is prohibited */ @@ -1434,11 +1367,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,  		if (!tipc_link_is_up(l_ptr))  			return; -		if (l_ptr->next_out) -			next_sent = buf_seqno(l_ptr->next_out); +		if (skb_queue_len(&l_ptr->backlogq)) +			next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));  		msg_set_next_sent(msg, next_sent); -		if (!skb_queue_empty(&l_ptr->deferred_queue)) { -			u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue)); +		if (!skb_queue_empty(&l_ptr->deferdq)) { +			u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));  			gap = mod(rec - mod(l_ptr->next_in_no));  		}  		msg_set_seq_gap(msg, gap); @@ -1446,35 +1379,20 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,  			l_ptr->stats.sent_nacks++;  		msg_set_link_tolerance(msg, tolerance);  		msg_set_linkprio(msg, priority); -		msg_set_max_pkt(msg, ack_mtu); +		msg_set_max_pkt(msg, l_ptr->mtu);  		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));  		msg_set_probe(msg, probe_msg != 0); -		if (probe_msg) { -			u32 mtu = l_ptr->max_pkt; - -			if ((mtu < l_ptr->max_pkt_target) && -			    link_working_working(l_ptr) && -			    l_ptr->fsm_msg_cnt) { -				msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; -				if (l_ptr->max_pkt_probes == 10) { -					l_ptr->max_pkt_target = (msg_size - 4); -					l_ptr->max_pkt_probes = 0; -					msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; -				} -				l_ptr->max_pkt_probes++; -			} - +		if (probe_msg)  			l_ptr->stats.sent_probes++; -		}  		l_ptr->stats.sent_states++;  	} else {		/* RESET_MSG or ACTIVATE_MSG */ -		msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); +		msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1));  		msg_set_seq_gap(msg, 0);  		msg_set_next_sent(msg, 1);  		msg_set_probe(msg, 0);  		msg_set_link_tolerance(msg, l_ptr->tolerance);  		msg_set_linkprio(msg, l_ptr->priority); -		msg_set_max_pkt(msg, l_ptr->max_pkt_target); +		msg_set_max_pkt(msg, l_ptr->advertised_mtu);  	}  	r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); @@ -1490,10 +1408,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,  	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));  	buf->priority = TC_PRIO_CONTROL; -  	tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,  			 &l_ptr->media_addr); -	l_ptr->unacked_window = 0; +	l_ptr->rcv_unacked = 0;  	kfree_skb(buf);  } @@ -1506,13 +1423,10 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,  				struct sk_buff *buf)  {  	u32 rec_gap = 0; -	u32 max_pkt_info; -	u32 max_pkt_ack;  	u32 msg_tol;  	struct tipc_msg *msg = buf_msg(buf); -	/* Discard protocol message during link changeover */ -	if (l_ptr->exp_msg_count) +	if (l_ptr->flags & LINK_FAILINGOVER)  		goto exit;  	if (l_ptr->net_plane != msg_net_plane(msg)) @@ -1551,15 +1465,8 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,  		if (msg_linkprio(msg) > l_ptr->priority)  			l_ptr->priority = msg_linkprio(msg); -		max_pkt_info = msg_max_pkt(msg); -		if (max_pkt_info) { -			if (max_pkt_info < l_ptr->max_pkt_target) -				l_ptr->max_pkt_target = max_pkt_info; -			if (l_ptr->max_pkt > l_ptr->max_pkt_target) -				l_ptr->max_pkt = l_ptr->max_pkt_target; -		} else { -			l_ptr->max_pkt = l_ptr->max_pkt_target; -		} +		if (l_ptr->mtu > msg_max_pkt(msg)) +			l_ptr->mtu = msg_max_pkt(msg);  		/* Synchronize broadcast link info, if not done previously */  		if (!tipc_node_is_up(l_ptr->owner)) { @@ -1604,18 +1511,8 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,  				      mod(l_ptr->next_in_no));  		} -		max_pkt_ack = msg_max_pkt(msg); -		if (max_pkt_ack > l_ptr->max_pkt) { -			l_ptr->max_pkt = max_pkt_ack; -			l_ptr->max_pkt_probes = 0; -		} - -		max_pkt_ack = 0; -		if (msg_probe(msg)) { +		if (msg_probe(msg))  			l_ptr->stats.recv_probes++; -			if (msg_size(msg) > sizeof(l_ptr->proto_msg)) -				max_pkt_ack = msg_size(msg); -		}  		/* Protocol message before retransmits, reduce loss risk */  		if (l_ptr->owner->bclink.recv_permitted) @@ -1623,12 +1520,12 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,  						      msg_last_bcast(msg));  		if (rec_gap || (msg_probe(msg))) { -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0, -					     0, max_pkt_ack); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, +					     rec_gap, 0, 0);  		}  		if (msg_seq_gap(msg)) {  			l_ptr->stats.recv_nacks++; -			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue), +			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),  					     msg_seq_gap(msg));  		}  		break; @@ -1675,7 +1572,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,   */  void tipc_link_failover_send_queue(struct tipc_link *l_ptr)  { -	u32 msgcount = skb_queue_len(&l_ptr->outqueue); +	int msgcount;  	struct tipc_link *tunnel = l_ptr->owner->active_links[0];  	struct tipc_msg tunnel_hdr;  	struct sk_buff *skb; @@ -1684,12 +1581,15 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)  	if (!tunnel)  		return; -	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, -		      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); +	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL, +		      FAILOVER_MSG, INT_H_SIZE, l_ptr->addr); +	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); +	tipc_link_purge_backlog(l_ptr); +	msgcount = skb_queue_len(&l_ptr->transmq);  	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);  	msg_set_msgcnt(&tunnel_hdr, msgcount); -	if (skb_queue_empty(&l_ptr->outqueue)) { +	if (skb_queue_empty(&l_ptr->transmq)) {  		skb = tipc_buf_acquire(INT_H_SIZE);  		if (skb) {  			skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); @@ -1705,7 +1605,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)  	split_bundles = (l_ptr->owner->active_links[0] !=  			 l_ptr->owner->active_links[1]); -	skb_queue_walk(&l_ptr->outqueue, skb) { +	skb_queue_walk(&l_ptr->transmq, skb) {  		struct tipc_msg *msg = buf_msg(skb);  		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { @@ -1736,157 +1636,105 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)   * and sequence order is preserved per sender/receiver socket pair.   * Owner node is locked.   */ -void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, -			      struct tipc_link *tunnel) +void tipc_link_dup_queue_xmit(struct tipc_link *link, +			      struct tipc_link *tnl)  {  	struct sk_buff *skb; -	struct tipc_msg tunnel_hdr; - -	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, -		      DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); -	msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue)); -	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); -	skb_queue_walk(&l_ptr->outqueue, skb) { +	struct tipc_msg tnl_hdr; +	struct sk_buff_head *queue = &link->transmq; +	int mcnt; + +	tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL, +		      SYNCH_MSG, INT_H_SIZE, link->addr); +	mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq); +	msg_set_msgcnt(&tnl_hdr, mcnt); +	msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id); + +tunnel_queue: +	skb_queue_walk(queue, skb) {  		struct sk_buff *outskb;  		struct tipc_msg *msg = buf_msg(skb); -		u32 length = msg_size(msg); +		u32 len = msg_size(msg); -		if (msg_user(msg) == MSG_BUNDLER) -			msg_set_type(msg, CLOSED_MSG); -		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */ -		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); -		msg_set_size(&tunnel_hdr, length + INT_H_SIZE); -		outskb = tipc_buf_acquire(length + INT_H_SIZE); +		msg_set_ack(msg, mod(link->next_in_no - 1)); +		msg_set_bcast_ack(msg, link->owner->bclink.last_in); +		msg_set_size(&tnl_hdr, len + INT_H_SIZE); +		outskb = tipc_buf_acquire(len + INT_H_SIZE);  		if (outskb == NULL) {  			pr_warn("%sunable to send duplicate msg\n",  				link_co_err);  			return;  		} -		skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); -		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, -					       length); -		__tipc_link_xmit_skb(tunnel, outskb); -		if (!tipc_link_is_up(l_ptr)) +		skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE); +		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, +					       skb->data, len); +		__tipc_link_xmit_skb(tnl, outskb); +		if (!tipc_link_is_up(link))  			return;  	} -} - -/** - * buf_extract - extracts embedded TIPC message from another message - * @skb: encapsulating message buffer - * @from_pos: offset to extract from - * - * Returns a new message buffer containing an embedded message.  The - * encapsulating buffer is left unchanged. - */ -static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) -{ -	struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos); -	u32 size = msg_size(msg); -	struct sk_buff *eb; - -	eb = tipc_buf_acquire(size); -	if (eb) -		skb_copy_to_linear_data(eb, msg, size); -	return eb; -} - -/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. - * Owner node is locked. - */ -static void tipc_link_dup_rcv(struct tipc_link *l_ptr, -			      struct sk_buff *t_buf) -{ -	struct sk_buff *buf; - -	if (!tipc_link_is_up(l_ptr)) +	if (queue == &link->backlogq)  		return; - -	buf = buf_extract(t_buf, INT_H_SIZE); -	if (buf == NULL) { -		pr_warn("%sfailed to extract inner dup pkt\n", link_co_err); -		return; -	} - -	/* Add buffer to deferred queue, if applicable: */ -	link_handle_out_of_seq_msg(l_ptr, buf); +	queue = &link->backlogq; +	goto tunnel_queue;  } -/*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet +/*  tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet   *  Owner node is locked.   */ -static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, -					      struct sk_buff *t_buf) +static bool tipc_link_failover_rcv(struct tipc_link *link, +				   struct sk_buff **skb)  { -	struct tipc_msg *t_msg = buf_msg(t_buf); -	struct sk_buff *buf = NULL; -	struct tipc_msg *msg; - -	if (tipc_link_is_up(l_ptr)) -		tipc_link_reset(l_ptr); - -	/* First failover packet? */ -	if (l_ptr->exp_msg_count == START_CHANGEOVER) -		l_ptr->exp_msg_count = msg_msgcnt(t_msg); - -	/* Should there be an inner packet? */ -	if (l_ptr->exp_msg_count) { -		l_ptr->exp_msg_count--; -		buf = buf_extract(t_buf, INT_H_SIZE); -		if (buf == NULL) { -			pr_warn("%sno inner failover pkt\n", link_co_err); -			goto exit; -		} -		msg = buf_msg(buf); +	struct tipc_msg *msg = buf_msg(*skb); +	struct sk_buff *iskb = NULL; +	struct tipc_link *pl = NULL; +	int bearer_id = msg_bearer_id(msg); +	int pos = 0; -		if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) { -			kfree_skb(buf); -			buf = NULL; -			goto exit; -		} -		if (msg_user(msg) == MSG_FRAGMENTER) { -			l_ptr->stats.recv_fragments++; -			tipc_buf_append(&l_ptr->reasm_buf, &buf); -		} +	if (msg_type(msg) != FAILOVER_MSG) { +		pr_warn("%sunknown tunnel pkt received\n", link_co_err); +		goto exit;  	} -exit: -	if ((!l_ptr->exp_msg_count) && (l_ptr->flags & LINK_STOPPED)) -		tipc_link_delete(l_ptr); -	return buf; -} +	if (bearer_id >= MAX_BEARERS) +		goto exit; -/*  tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent - *  via other link as result of a failover (ORIGINAL_MSG) or - *  a new active link (DUPLICATE_MSG). Failover packets are - *  returned to the active link for delivery upwards. - *  Owner node is locked. - */ -static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, -				struct sk_buff **buf) -{ -	struct sk_buff *t_buf = *buf; -	struct tipc_link *l_ptr; -	struct tipc_msg *t_msg = buf_msg(t_buf); -	u32 bearer_id = msg_bearer_id(t_msg); +	if (bearer_id == link->bearer_id) +		goto exit; -	*buf = NULL; +	pl = link->owner->links[bearer_id]; +	if (pl && tipc_link_is_up(pl)) +		tipc_link_reset(pl); -	if (bearer_id >= MAX_BEARERS) +	if (link->failover_pkts == FIRST_FAILOVER) +		link->failover_pkts = msg_msgcnt(msg); + +	/* Should we expect an inner packet? */ +	if (!link->failover_pkts)  		goto exit; -	l_ptr = n_ptr->links[bearer_id]; -	if (!l_ptr) +	if (!tipc_msg_extract(*skb, &iskb, &pos)) { +		pr_warn("%sno inner failover pkt\n", link_co_err); +		*skb = NULL;  		goto exit; +	} +	link->failover_pkts--; +	*skb = NULL; -	if (msg_type(t_msg) == DUPLICATE_MSG) -		tipc_link_dup_rcv(l_ptr, t_buf); -	else if (msg_type(t_msg) == ORIGINAL_MSG) -		*buf = tipc_link_failover_rcv(l_ptr, t_buf); -	else -		pr_warn("%sunknown tunnel pkt received\n", link_co_err); +	/* Was this packet already delivered? */ +	if (less(buf_seqno(iskb), link->failover_checkpt)) { +		kfree_skb(iskb); +		iskb = NULL; +		goto exit; +	} +	if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) { +		link->stats.recv_fragments++; +		tipc_buf_append(&link->failover_skb, &iskb); +	}  exit: -	kfree_skb(t_buf); -	return *buf != NULL; +	if (!link->failover_pkts && pl) +		pl->flags &= ~LINK_FAILINGOVER; +	kfree_skb(*skb); +	*skb = iskb; +	return *skb;  }  static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) @@ -1901,23 +1749,16 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)  	l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4);  } -void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window) +void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)  { -	/* Data messages from this node, inclusive FIRST_FRAGM */ -	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; -	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; -	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5; -	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6; -	/* Transiting data messages,inclusive FIRST_FRAGM */ -	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300; -	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600; -	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900; -	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200; -	l_ptr->queue_limit[CONN_MANAGER] = 1200; -	l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500; -	l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000; -	/* FRAGMENT and LAST_FRAGMENT packets */ -	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; +	int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); + +	l->window = win; +	l->backlog[TIPC_LOW_IMPORTANCE].limit      = win / 2; +	l->backlog[TIPC_MEDIUM_IMPORTANCE].limit   = win; +	l->backlog[TIPC_HIGH_IMPORTANCE].limit     = win / 2 * 3; +	l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2; +	l->backlog[TIPC_SYSTEM_IMPORTANCE].limit   = max_bulk;  }  /* tipc_link_find_owner - locate owner node of link by link's name @@ -2082,14 +1923,14 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)  			tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);  			link_set_supervision_props(link, tol); -			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0); +			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0);  		}  		if (props[TIPC_NLA_PROP_PRIO]) {  			u32 prio;  			prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);  			link->priority = prio; -			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0); +			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio);  		}  		if (props[TIPC_NLA_PROP_WIN]) {  			u32 win; @@ -2194,7 +2035,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,  	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,  			tipc_cluster_mask(tn->own_addr)))  		goto attr_msg_full; -	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt)) +	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->mtu))  		goto attr_msg_full;  	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no))  		goto attr_msg_full; @@ -2216,7 +2057,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,  	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))  		goto prop_msg_full;  	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, -			link->queue_limit[TIPC_LOW_IMPORTANCE])) +			link->window))  		goto prop_msg_full;  	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))  		goto prop_msg_full; @@ -2282,7 +2123,6 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)  	msg.seq = cb->nlh->nlmsg_seq;  	rcu_read_lock(); -  	if (prev_node) {  		node = tipc_node_find(net, prev_node);  		if (!node) { @@ -2295,6 +2135,7 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)  			cb->prev_seq = 1;  			goto out;  		} +		tipc_node_put(node);  		list_for_each_entry_continue_rcu(node, &tn->node_list,  						 list) { @@ -2302,6 +2143,7 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)  			err = __tipc_nl_add_node_links(net, &msg, node,  						       &prev_link);  			tipc_node_unlock(node); +			tipc_node_put(node);  			if (err)  				goto out; diff --git a/net/tipc/link.h b/net/tipc/link.h index 7aeb52092bf3..b5b4e3554d4e 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -58,8 +58,10 @@  /* Link endpoint execution states   */ -#define LINK_STARTED    0x0001 -#define LINK_STOPPED    0x0002 +#define LINK_STARTED     0x0001 +#define LINK_STOPPED     0x0002 +#define LINK_SYNCHING    0x0004 +#define LINK_FAILINGOVER 0x0008  /* Starting value for maximum packet size negotiation on unicast links   * (unless bearer MTU is less) @@ -118,13 +120,13 @@ struct tipc_stats {   * @pmsg: convenience pointer to "proto_msg" field   * @priority: current link priority   * @net_plane: current link network plane ('A' through 'H') - * @queue_limit: outbound message queue congestion thresholds (indexed by user) + * @backlog_limit: backlog queue congestion thresholds (indexed by importance)   * @exp_msg_count: # of tunnelled messages expected during link changeover   * @reset_checkpoint: seq # of last acknowledged message at time of link reset - * @max_pkt: current maximum packet size for this link - * @max_pkt_target: desired maximum packet size for this link - * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) - * @outqueue: outbound message queue + * @mtu: current maximum packet size for this link + * @advertised_mtu: advertised own mtu when link is being established + * @transmitq: queue for sent, non-acked messages + * @backlogq: queue for messages waiting to be sent   * @next_out_no: next sequence number to use for outbound messages   * @last_retransmitted: sequence number of most recently retransmitted message   * @stale_count: # of identical retransmit requests made by peer @@ -165,36 +167,40 @@ struct tipc_link {  	struct tipc_msg *pmsg;  	u32 priority;  	char net_plane; -	u32 queue_limit[15];	/* queue_limit[0]==window limit */ +	u16 synch_point; -	/* Changeover */ -	u32 exp_msg_count; -	u32 reset_checkpoint; +	/* Failover */ +	u16 failover_pkts; +	u16 failover_checkpt; +	struct sk_buff *failover_skb;  	/* Max packet negotiation */ -	u32 max_pkt; -	u32 max_pkt_target; -	u32 max_pkt_probes; +	u16 mtu; +	u16 advertised_mtu;  	/* Sending */ -	struct sk_buff_head outqueue; +	struct sk_buff_head transmq; +	struct sk_buff_head backlogq; +	struct { +		u16 len; +		u16 limit; +	} backlog[5];  	u32 next_out_no; +	u32 window;  	u32 last_retransmitted;  	u32 stale_count;  	/* Reception */  	u32 next_in_no; -	struct sk_buff_head deferred_queue; -	u32 unacked_window; +	u32 rcv_unacked; +	struct sk_buff_head deferdq;  	struct sk_buff_head inputq;  	struct sk_buff_head namedq;  	/* Congestion handling */ -	struct sk_buff *next_out;  	struct sk_buff_head wakeupq;  	/* Fragmentation/reassembly */ -	u32 long_msg_seq_no;  	struct sk_buff *reasm_buf;  	/* Statistics */ @@ -225,7 +231,7 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,  int __tipc_link_xmit(struct net *net, struct tipc_link *link,  		     struct sk_buff_head *list);  void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, -			  u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); +			  u32 gap, u32 tolerance, u32 priority);  void tipc_link_push_packets(struct tipc_link *l_ptr);  u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf);  void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); @@ -302,9 +308,4 @@ static inline int link_reset_reset(struct tipc_link *l_ptr)  	return l_ptr->state == RESET_RESET;  } -static inline int link_congested(struct tipc_link *l_ptr) -{ -	return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0]; -} -  #endif diff --git a/net/tipc/msg.c b/net/tipc/msg.c index b6eb90cd3ef7..c3e96e815418 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -1,7 +1,7 @@  /*   * net/tipc/msg.c: TIPC message header routines   * - * Copyright (c) 2000-2006, 2014, Ericsson AB + * Copyright (c) 2000-2006, 2014-2015, Ericsson AB   * Copyright (c) 2005, 2010-2011, Wind River Systems   * All rights reserved.   * @@ -165,6 +165,9 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)  	}  	if (fragid == LAST_FRAGMENT) { +		TIPC_SKB_CB(head)->validated = false; +		if (unlikely(!tipc_msg_validate(head))) +			goto err;  		*buf = head;  		TIPC_SKB_CB(head)->tail = NULL;  		*headbuf = NULL; @@ -172,7 +175,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)  	}  	*buf = NULL;  	return 0; -  err:  	pr_warn_ratelimited("Unable to build fragment list\n");  	kfree_skb(*buf); @@ -181,6 +183,48 @@ err:  	return 0;  } +/* tipc_msg_validate - validate basic format of received message + * + * This routine ensures a TIPC message has an acceptable header, and at least + * as much data as the header indicates it should.  The routine also ensures + * that the entire message header is stored in the main fragment of the message + * buffer, to simplify future access to message header fields. + * + * Note: Having extra info present in the message header or data areas is OK. + * TIPC will ignore the excess, under the assumption that it is optional info + * introduced by a later release of the protocol. + */ +bool tipc_msg_validate(struct sk_buff *skb) +{ +	struct tipc_msg *msg; +	int msz, hsz; + +	if (unlikely(TIPC_SKB_CB(skb)->validated)) +		return true; +	if (unlikely(!pskb_may_pull(skb, MIN_H_SIZE))) +		return false; + +	hsz = msg_hdr_sz(buf_msg(skb)); +	if (unlikely(hsz < MIN_H_SIZE) || (hsz > MAX_H_SIZE)) +		return false; +	if (unlikely(!pskb_may_pull(skb, hsz))) +		return false; + +	msg = buf_msg(skb); +	if (unlikely(msg_version(msg) != TIPC_VERSION)) +		return false; + +	msz = msg_size(msg); +	if (unlikely(msz < hsz)) +		return false; +	if (unlikely((msz - hsz) > TIPC_MAX_USER_MSG_SIZE)) +		return false; +	if (unlikely(skb->len < msz)) +		return false; + +	TIPC_SKB_CB(skb)->validated = true; +	return true; +}  /**   * tipc_msg_build - create buffer chain containing specified header and data @@ -228,6 +272,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,  		      FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));  	msg_set_size(&pkthdr, pktmax);  	msg_set_fragm_no(&pkthdr, pktno); +	msg_set_importance(&pkthdr, msg_importance(mhdr));  	/* Prepare first fragment */  	skb = tipc_buf_acquire(pktmax); @@ -286,33 +331,36 @@ error:  /**   * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one - * @list: the buffer chain of the existing buffer ("bundle") + * @bskb: the buffer to append to ("bundle")   * @skb:  buffer to be appended   * @mtu:  max allowable size for the bundle buffer   * Consumes buffer if successful   * Returns true if bundling could be performed, otherwise false   */ -bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu) +bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)  { -	struct sk_buff *bskb = skb_peek_tail(list); -	struct tipc_msg *bmsg = buf_msg(bskb); +	struct tipc_msg *bmsg;  	struct tipc_msg *msg = buf_msg(skb); -	unsigned int bsz = msg_size(bmsg); +	unsigned int bsz;  	unsigned int msz = msg_size(msg); -	u32 start = align(bsz); +	u32 start, pad;  	u32 max = mtu - INT_H_SIZE; -	u32 pad = start - bsz;  	if (likely(msg_user(msg) == MSG_FRAGMENTER))  		return false; -	if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) +	if (!bskb) +		return false; +	bmsg = buf_msg(bskb); +	bsz = msg_size(bmsg); +	start = align(bsz); +	pad = start - bsz; + +	if (unlikely(msg_user(msg) == TUNNEL_PROTOCOL))  		return false;  	if (unlikely(msg_user(msg) == BCAST_PROTOCOL))  		return false;  	if (likely(msg_user(bmsg) != MSG_BUNDLER))  		return false; -	if (likely(!TIPC_SKB_CB(bskb)->bundling)) -		return false;  	if (unlikely(skb_tailroom(bskb) < (pad + msz)))  		return false;  	if (unlikely(max < (start + msz))) @@ -328,34 +376,40 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)  /**   *  tipc_msg_extract(): extract bundled inner packet from buffer - *  @skb: linear outer buffer, to be extracted from. + *  @skb: buffer to be extracted from.   *  @iskb: extracted inner buffer, to be returned - *  @pos: position of msg to be extracted. Returns with pointer of next msg + *  @pos: position in outer message of msg to be extracted. + *        Returns position of next msg   *  Consumes outer buffer when last packet extracted   *  Returns true when when there is an extracted buffer, otherwise false   */  bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)  { -	struct tipc_msg *msg = buf_msg(skb); -	int imsz; -	struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos); +	struct tipc_msg *msg; +	int imsz, offset; -	/* Is there space left for shortest possible message? */ -	if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE)) +	*iskb = NULL; +	if (unlikely(skb_linearize(skb))) +		goto none; + +	msg = buf_msg(skb); +	offset = msg_hdr_sz(msg) + *pos; +	if (unlikely(offset > (msg_size(msg) - MIN_H_SIZE)))  		goto none; -	imsz = msg_size(imsg); -	/* Is there space left for current message ? */ -	if ((*pos + imsz) > msg_data_sz(msg)) +	*iskb = skb_clone(skb, GFP_ATOMIC); +	if (unlikely(!*iskb))  		goto none; -	*iskb = tipc_buf_acquire(imsz); -	if (!*iskb) +	skb_pull(*iskb, offset); +	imsz = msg_size(buf_msg(*iskb)); +	skb_trim(*iskb, imsz); +	if (unlikely(!tipc_msg_validate(*iskb)))  		goto none; -	skb_copy_to_linear_data(*iskb, imsg, imsz);  	*pos += align(imsz);  	return true;  none:  	kfree_skb(skb); +	kfree_skb(*iskb);  	*iskb = NULL;  	return false;  } @@ -369,18 +423,17 @@ none:   * Replaces buffer if successful   * Returns true if success, otherwise false   */ -bool tipc_msg_make_bundle(struct sk_buff_head *list, -			  struct sk_buff *skb, u32 mtu, u32 dnode) +bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)  {  	struct sk_buff *bskb;  	struct tipc_msg *bmsg; -	struct tipc_msg *msg = buf_msg(skb); +	struct tipc_msg *msg = buf_msg(*skb);  	u32 msz = msg_size(msg);  	u32 max = mtu - INT_H_SIZE;  	if (msg_user(msg) == MSG_FRAGMENTER)  		return false; -	if (msg_user(msg) == CHANGEOVER_PROTOCOL) +	if (msg_user(msg) == TUNNEL_PROTOCOL)  		return false;  	if (msg_user(msg) == BCAST_PROTOCOL)  		return false; @@ -398,9 +451,9 @@ bool tipc_msg_make_bundle(struct sk_buff_head *list,  	msg_set_seqno(bmsg, msg_seqno(msg));  	msg_set_ack(bmsg, msg_ack(msg));  	msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); -	TIPC_SKB_CB(bskb)->bundling = true; -	__skb_queue_tail(list, bskb); -	return tipc_msg_bundle(list, skb, mtu); +	tipc_msg_bundle(bskb, *skb, mtu); +	*skb = bskb; +	return true;  }  /** @@ -415,21 +468,17 @@ bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode,  		      int err)  {  	struct tipc_msg *msg = buf_msg(buf); -	uint imp = msg_importance(msg);  	struct tipc_msg ohdr;  	uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);  	if (skb_linearize(buf))  		goto exit; +	msg = buf_msg(buf);  	if (msg_dest_droppable(msg))  		goto exit;  	if (msg_errcode(msg))  		goto exit; -  	memcpy(&ohdr, msg, msg_hdr_sz(msg)); -	imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE); -	if (msg_isdata(msg)) -		msg_set_importance(msg, imp);  	msg_set_errcode(msg, err);  	msg_set_origport(msg, msg_destport(&ohdr));  	msg_set_destport(msg, msg_origport(&ohdr)); @@ -462,15 +511,18 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,  {  	struct tipc_msg *msg = buf_msg(skb);  	u32 dport; +	u32 own_addr = tipc_own_addr(net);  	if (!msg_isdata(msg))  		return false;  	if (!msg_named(msg))  		return false; +	if (msg_errcode(msg)) +		return false;  	*err = -TIPC_ERR_NO_NAME;  	if (skb_linearize(skb))  		return false; -	if (msg_reroute_cnt(msg) > 0) +	if (msg_reroute_cnt(msg))  		return false;  	*dnode = addr_domain(net, msg_lookup_scope(msg));  	dport = tipc_nametbl_translate(net, msg_nametype(msg), @@ -478,6 +530,8 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,  	if (!dport)  		return false;  	msg_incr_reroute_cnt(msg); +	if (*dnode != own_addr) +		msg_set_prevnode(msg, own_addr);  	msg_set_destnode(msg, *dnode);  	msg_set_destport(msg, dport);  	*err = TIPC_OK; diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 9ace47f44a69..e1d3595e2ee9 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1,7 +1,7 @@  /*   * net/tipc/msg.h: Include file for TIPC message header routines   * - * Copyright (c) 2000-2007, 2014, Ericsson AB + * Copyright (c) 2000-2007, 2014-2015 Ericsson AB   * Copyright (c) 2005-2008, 2010-2011, Wind River Systems   * All rights reserved.   * @@ -54,6 +54,8 @@ struct plist;   * - TIPC_HIGH_IMPORTANCE   * - TIPC_CRITICAL_IMPORTANCE   */ +#define TIPC_SYSTEM_IMPORTANCE	4 +  /*   * Payload message types @@ -64,6 +66,19 @@ struct plist;  #define TIPC_DIRECT_MSG		3  /* + * Internal message users + */ +#define  BCAST_PROTOCOL       5 +#define  MSG_BUNDLER          6 +#define  LINK_PROTOCOL        7 +#define  CONN_MANAGER         8 +#define  TUNNEL_PROTOCOL      10 +#define  NAME_DISTRIBUTOR     11 +#define  MSG_FRAGMENTER       12 +#define  LINK_CONFIG          13 +#define  SOCK_WAKEUP          14       /* pseudo user */ + +/*   * Message header sizes   */  #define SHORT_H_SIZE              24	/* In-cluster basic payload message */ @@ -76,7 +91,7 @@ struct plist;  #define MAX_MSG_SIZE (MAX_H_SIZE + TIPC_MAX_USER_MSG_SIZE) -#define TIPC_MEDIA_ADDR_OFFSET	5 +#define TIPC_MEDIA_INFO_OFFSET	5  /**   * TIPC message buffer code @@ -87,12 +102,12 @@ struct plist;   * Note: Headroom should be a multiple of 4 to ensure the TIPC header fields   *       are word aligned for quicker access   */ -#define BUF_HEADROOM LL_MAX_HEADER +#define BUF_HEADROOM (LL_MAX_HEADER + 48)  struct tipc_skb_cb {  	void *handle;  	struct sk_buff *tail; -	bool deferred; +	bool validated;  	bool wakeup_pending;  	bool bundling;  	u16 chain_sz; @@ -170,16 +185,6 @@ static inline void msg_set_user(struct tipc_msg *m, u32 n)  	msg_set_bits(m, 0, 25, 0xf, n);  } -static inline u32 msg_importance(struct tipc_msg *m) -{ -	return msg_bits(m, 0, 25, 0xf); -} - -static inline void msg_set_importance(struct tipc_msg *m, u32 i) -{ -	msg_set_user(m, i); -} -  static inline u32 msg_hdr_sz(struct tipc_msg *m)  {  	return msg_bits(m, 0, 21, 0xf) << 2; @@ -235,6 +240,15 @@ static inline void msg_set_size(struct tipc_msg *m, u32 sz)  	m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);  } +static inline unchar *msg_data(struct tipc_msg *m) +{ +	return ((unchar *)m) + msg_hdr_sz(m); +} + +static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) +{ +	return (struct tipc_msg *)msg_data(m); +}  /*   * Word 1 @@ -336,6 +350,25 @@ static inline void msg_set_seqno(struct tipc_msg *m, u32 n)  /*   * Words 3-10   */ +static inline u32 msg_importance(struct tipc_msg *m) +{ +	if (unlikely(msg_user(m) == MSG_FRAGMENTER)) +		return msg_bits(m, 5, 13, 0x7); +	if (likely(msg_isdata(m) && !msg_errcode(m))) +		return msg_user(m); +	return TIPC_SYSTEM_IMPORTANCE; +} + +static inline void msg_set_importance(struct tipc_msg *m, u32 i) +{ +	if (unlikely(msg_user(m) == MSG_FRAGMENTER)) +		msg_set_bits(m, 5, 13, 0x7, i); +	else if (likely(i < TIPC_SYSTEM_IMPORTANCE)) +		msg_set_user(m, i); +	else +		pr_warn("Trying to set illegal importance in message\n"); +} +  static inline u32 msg_prevnode(struct tipc_msg *m)  {  	return msg_word(m, 3); @@ -348,6 +381,8 @@ static inline void msg_set_prevnode(struct tipc_msg *m, u32 a)  static inline u32 msg_origport(struct tipc_msg *m)  { +	if (msg_user(m) == MSG_FRAGMENTER) +		m = msg_get_wrapped(m);  	return msg_word(m, 4);  } @@ -443,35 +478,11 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)  	msg_set_word(m, 10, n);  } -static inline unchar *msg_data(struct tipc_msg *m) -{ -	return ((unchar *)m) + msg_hdr_sz(m); -} - -static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) -{ -	return (struct tipc_msg *)msg_data(m); -} -  /*   * Constants and routines used to read and write TIPC internal message headers   */  /* - * Internal message users - */ -#define  BCAST_PROTOCOL       5 -#define  MSG_BUNDLER          6 -#define  LINK_PROTOCOL        7 -#define  CONN_MANAGER         8 -#define  ROUTE_DISTRIBUTOR    9		/* obsoleted */ -#define  CHANGEOVER_PROTOCOL  10 -#define  NAME_DISTRIBUTOR     11 -#define  MSG_FRAGMENTER       12 -#define  LINK_CONFIG          13 -#define  SOCK_WAKEUP          14       /* pseudo user */ - -/*   *  Connection management protocol message types   */  #define CONN_PROBE        0 @@ -501,8 +512,8 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)  /*   * Changeover tunnel message types   */ -#define DUPLICATE_MSG		0 -#define ORIGINAL_MSG		1 +#define SYNCH_MSG		0 +#define FAILOVER_MSG		1  /*   * Config protocol message types @@ -510,7 +521,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)  #define DSC_REQ_MSG		0  #define DSC_RESP_MSG		1 -  /*   * Word 1   */ @@ -534,6 +544,24 @@ static inline void msg_set_node_sig(struct tipc_msg *m, u32 n)  	msg_set_bits(m, 1, 0, 0xffff, n);  } +static inline u32 msg_node_capabilities(struct tipc_msg *m) +{ +	return msg_bits(m, 1, 15, 0x1fff); +} + +static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n) +{ +	msg_set_bits(m, 1, 15, 0x1fff, n); +} + +static inline bool msg_dup(struct tipc_msg *m) +{ +	if (likely(msg_user(m) != TUNNEL_PROTOCOL)) +		return false; +	if (msg_type(m) != SYNCH_MSG) +		return false; +	return true; +}  /*   * Word 2 @@ -688,7 +716,7 @@ static inline void msg_set_redundant_link(struct tipc_msg *m, u32 r)  static inline char *msg_media_addr(struct tipc_msg *m)  { -	return (char *)&m->hdr[TIPC_MEDIA_ADDR_OFFSET]; +	return (char *)&m->hdr[TIPC_MEDIA_INFO_OFFSET];  }  /* @@ -734,21 +762,8 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)  	msg_set_bits(m, 9, 0, 0xffff, n);  } -static inline u32 tipc_msg_tot_importance(struct tipc_msg *m) -{ -	if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) -		return msg_importance(msg_get_wrapped(m)); -	return msg_importance(m); -} - -static inline u32 msg_tot_origport(struct tipc_msg *m) -{ -	if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) -		return msg_origport(msg_get_wrapped(m)); -	return msg_origport(m); -} -  struct sk_buff *tipc_buf_acquire(u32 size); +bool tipc_msg_validate(struct sk_buff *skb);  bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,  		      int err);  void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, @@ -757,9 +772,9 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,  				uint data_sz, u32 dnode, u32 onode,  				u32 dport, u32 oport, int errcode);  int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); -bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); -bool tipc_msg_make_bundle(struct sk_buff_head *list, -			  struct sk_buff *skb, u32 mtu, u32 dnode); +bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu); + +bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode);  bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);  int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,  		   int offset, int dsz, int mtu, struct sk_buff_head *list); diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index fcb07915aaac..41e7b7e4dda0 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -98,7 +98,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)  			continue;  		if (!tipc_node_active_links(node))  			continue; -		oskb = skb_copy(skb, GFP_ATOMIC); +		oskb = pskb_copy(skb, GFP_ATOMIC);  		if (!oskb)  			break;  		msg_set_destnode(buf_msg(oskb), dnode); @@ -244,6 +244,7 @@ static void tipc_publ_subscribe(struct net *net, struct publication *publ,  	tipc_node_lock(node);  	list_add_tail(&publ->nodesub_list, &node->publ_list);  	tipc_node_unlock(node); +	tipc_node_put(node);  }  static void tipc_publ_unsubscribe(struct net *net, struct publication *publ, @@ -258,6 +259,7 @@ static void tipc_publ_unsubscribe(struct net *net, struct publication *publ,  	tipc_node_lock(node);  	list_del_init(&publ->nodesub_list);  	tipc_node_unlock(node); +	tipc_node_put(node);  }  /** diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 105ba7adf06f..ab0ac62a1287 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -811,8 +811,8 @@ static void tipc_purge_publications(struct net *net, struct name_seq *seq)  	sseq = seq->sseqs;  	info = sseq->info;  	list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) { -		tipc_nametbl_remove_publ(net, publ->type, publ->lower, -					 publ->node, publ->ref, publ->key); +		tipc_nameseq_remove_publ(net, seq, publ->lower, publ->node, +					 publ->ref, publ->key);  		kfree_rcu(publ, rcu);  	}  	hlist_del_init_rcu(&seq->ns_list); diff --git a/net/tipc/node.c b/net/tipc/node.c index 86152de8248d..22c059ad2999 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -42,6 +42,7 @@  static void node_lost_contact(struct tipc_node *n_ptr);  static void node_established_contact(struct tipc_node *n_ptr); +static void tipc_node_delete(struct tipc_node *node);  struct tipc_sock_conn {  	u32 port; @@ -67,6 +68,23 @@ static unsigned int tipc_hashfn(u32 addr)  	return addr & (NODE_HTABLE_SIZE - 1);  } +static void tipc_node_kref_release(struct kref *kref) +{ +	struct tipc_node *node = container_of(kref, struct tipc_node, kref); + +	tipc_node_delete(node); +} + +void tipc_node_put(struct tipc_node *node) +{ +	kref_put(&node->kref, tipc_node_kref_release); +} + +static void tipc_node_get(struct tipc_node *node) +{ +	kref_get(&node->kref); +} +  /*   * tipc_node_find - locate specified node object, if it exists   */ @@ -82,6 +100,7 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)  	hlist_for_each_entry_rcu(node, &tn->node_htable[tipc_hashfn(addr)],  				 hash) {  		if (node->addr == addr) { +			tipc_node_get(node);  			rcu_read_unlock();  			return node;  		} @@ -106,12 +125,13 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)  	}  	n_ptr->addr = addr;  	n_ptr->net = net; +	kref_init(&n_ptr->kref);  	spin_lock_init(&n_ptr->lock);  	INIT_HLIST_NODE(&n_ptr->hash);  	INIT_LIST_HEAD(&n_ptr->list);  	INIT_LIST_HEAD(&n_ptr->publ_list);  	INIT_LIST_HEAD(&n_ptr->conn_sks); -	__skb_queue_head_init(&n_ptr->bclink.deferred_queue); +	__skb_queue_head_init(&n_ptr->bclink.deferdq);  	hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);  	list_for_each_entry_rcu(temp_node, &tn->node_list, list) {  		if (n_ptr->addr < temp_node->addr) @@ -120,16 +140,17 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)  	list_add_tail_rcu(&n_ptr->list, &temp_node->list);  	n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN;  	n_ptr->signature = INVALID_NODE_SIG; +	tipc_node_get(n_ptr);  exit:  	spin_unlock_bh(&tn->node_list_lock);  	return n_ptr;  } -static void tipc_node_delete(struct tipc_net *tn, struct tipc_node *n_ptr) +static void tipc_node_delete(struct tipc_node *node)  { -	list_del_rcu(&n_ptr->list); -	hlist_del_rcu(&n_ptr->hash); -	kfree_rcu(n_ptr, rcu); +	list_del_rcu(&node->list); +	hlist_del_rcu(&node->hash); +	kfree_rcu(node, rcu);  }  void tipc_node_stop(struct net *net) @@ -139,7 +160,7 @@ void tipc_node_stop(struct net *net)  	spin_lock_bh(&tn->node_list_lock);  	list_for_each_entry_safe(node, t_node, &tn->node_list, list) -		tipc_node_delete(tn, node); +		tipc_node_put(node);  	spin_unlock_bh(&tn->node_list_lock);  } @@ -147,6 +168,7 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)  {  	struct tipc_node *node;  	struct tipc_sock_conn *conn; +	int err = 0;  	if (in_own_node(net, dnode))  		return 0; @@ -157,8 +179,10 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)  		return -EHOSTUNREACH;  	}  	conn = kmalloc(sizeof(*conn), GFP_ATOMIC); -	if (!conn) -		return -EHOSTUNREACH; +	if (!conn) { +		err = -EHOSTUNREACH; +		goto exit; +	}  	conn->peer_node = dnode;  	conn->port = port;  	conn->peer_port = peer_port; @@ -166,7 +190,9 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)  	tipc_node_lock(node);  	list_add_tail(&conn->list, &node->conn_sks);  	tipc_node_unlock(node); -	return 0; +exit: +	tipc_node_put(node); +	return err;  }  void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) @@ -189,6 +215,7 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)  		kfree(conn);  	}  	tipc_node_unlock(node); +	tipc_node_put(node);  }  /** @@ -227,8 +254,8 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)  	active[0] = active[1] = l_ptr;  exit:  	/* Leave room for changeover header when returning 'mtu' to users: */ -	n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE; -	n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE; +	n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; +	n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE;  }  /** @@ -292,11 +319,10 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)  	/* Leave room for changeover header when returning 'mtu' to users: */  	if (active[0]) { -		n_ptr->act_mtus[0] = active[0]->max_pkt - INT_H_SIZE; -		n_ptr->act_mtus[1] = active[1]->max_pkt - INT_H_SIZE; +		n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; +		n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE;  		return;  	} -  	/* Loopback link went down? No fragmentation needed from now on. */  	if (n_ptr->addr == tn->own_addr) {  		n_ptr->act_mtus[0] = MAX_MSG_SIZE; @@ -354,7 +380,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)  	/* Flush broadcast link info associated with lost node */  	if (n_ptr->bclink.recv_permitted) { -		__skb_queue_purge(&n_ptr->bclink.deferred_queue); +		__skb_queue_purge(&n_ptr->bclink.deferdq);  		if (n_ptr->bclink.reasm_buf) {  			kfree_skb(n_ptr->bclink.reasm_buf); @@ -367,18 +393,17 @@ static void node_lost_contact(struct tipc_node *n_ptr)  		n_ptr->bclink.recv_permitted = false;  	} -	/* Abort link changeover */ +	/* Abort any ongoing link failover */  	for (i = 0; i < MAX_BEARERS; i++) {  		struct tipc_link *l_ptr = n_ptr->links[i];  		if (!l_ptr)  			continue; -		l_ptr->reset_checkpoint = l_ptr->next_in_no; -		l_ptr->exp_msg_count = 0; +		l_ptr->flags &= ~LINK_FAILINGOVER; +		l_ptr->failover_checkpt = 0; +		l_ptr->failover_pkts = 0; +		kfree_skb(l_ptr->failover_skb); +		l_ptr->failover_skb = NULL;  		tipc_link_reset_fragments(l_ptr); - -		/* Link marked for deletion after failover? => do it now */ -		if (l_ptr->flags & LINK_STOPPED) -			tipc_link_delete(l_ptr);  	}  	n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN; @@ -417,19 +442,25 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,  			   char *linkname, size_t len)  {  	struct tipc_link *link; +	int err = -EINVAL;  	struct tipc_node *node = tipc_node_find(net, addr); -	if ((bearer_id >= MAX_BEARERS) || !node) -		return -EINVAL; +	if (!node) +		return err; + +	if (bearer_id >= MAX_BEARERS) +		goto exit; +  	tipc_node_lock(node);  	link = node->links[bearer_id];  	if (link) {  		strncpy(linkname, link->name, len); -		tipc_node_unlock(node); -		return 0; +		err = 0;  	} +exit:  	tipc_node_unlock(node); -	return -EINVAL; +	tipc_node_put(node); +	return err;  }  void tipc_node_unlock(struct tipc_node *node) @@ -459,7 +490,7 @@ void tipc_node_unlock(struct tipc_node *node)  				TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |  				TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |  				TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT | -				TIPC_NAMED_MSG_EVT); +				TIPC_NAMED_MSG_EVT | TIPC_BCAST_RESET);  	spin_unlock_bh(&node->lock); @@ -488,6 +519,9 @@ void tipc_node_unlock(struct tipc_node *node)  	if (flags & TIPC_BCAST_MSG_EVT)  		tipc_bclink_input(net); + +	if (flags & TIPC_BCAST_RESET) +		tipc_link_reset_all(node);  }  /* Caller should hold node lock for the passed node */ @@ -542,17 +576,21 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)  	msg.seq = cb->nlh->nlmsg_seq;  	rcu_read_lock(); - -	if (last_addr && !tipc_node_find(net, last_addr)) { -		rcu_read_unlock(); -		/* We never set seq or call nl_dump_check_consistent() this -		 * means that setting prev_seq here will cause the consistence -		 * check to fail in the netlink callback handler. Resulting in -		 * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if -		 * the node state changed while we released the lock. -		 */ -		cb->prev_seq = 1; -		return -EPIPE; +	if (last_addr) { +		node = tipc_node_find(net, last_addr); +		if (!node) { +			rcu_read_unlock(); +			/* We never set seq or call nl_dump_check_consistent() +			 * this means that setting prev_seq here will cause the +			 * consistence check to fail in the netlink callback +			 * handler. Resulting in the NLMSG_DONE message having +			 * the NLM_F_DUMP_INTR flag set if the node state +			 * changed while we released the lock. +			 */ +			cb->prev_seq = 1; +			return -EPIPE; +		} +		tipc_node_put(node);  	}  	list_for_each_entry_rcu(node, &tn->node_list, list) { diff --git a/net/tipc/node.h b/net/tipc/node.h index 3d18c66b7f78..02d5c20dc551 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -64,7 +64,8 @@ enum {  	TIPC_NOTIFY_LINK_UP		= (1 << 6),  	TIPC_NOTIFY_LINK_DOWN		= (1 << 7),  	TIPC_NAMED_MSG_EVT		= (1 << 8), -	TIPC_BCAST_MSG_EVT		= (1 << 9) +	TIPC_BCAST_MSG_EVT		= (1 << 9), +	TIPC_BCAST_RESET		= (1 << 10)  };  /** @@ -84,7 +85,7 @@ struct tipc_node_bclink {  	u32 last_sent;  	u32 oos_state;  	u32 deferred_size; -	struct sk_buff_head deferred_queue; +	struct sk_buff_head deferdq;  	struct sk_buff *reasm_buf;  	int inputq_map;  	bool recv_permitted; @@ -93,6 +94,7 @@ struct tipc_node_bclink {  /**   * struct tipc_node - TIPC node structure   * @addr: network address of node + * @ref: reference counter to node object   * @lock: spinlock governing access to structure   * @net: the applicable net namespace   * @hash: links to adjacent nodes in unsorted hash chain @@ -106,6 +108,7 @@ struct tipc_node_bclink {   * @list: links to adjacent nodes in sorted list of cluster's nodes   * @working_links: number of working links to node (both active and standby)   * @link_cnt: number of links to node + * @capabilities: bitmap, indicating peer node's functional capabilities   * @signature: node instance identifier   * @link_id: local and remote bearer ids of changing link, if any   * @publ_list: list of publications @@ -113,6 +116,7 @@ struct tipc_node_bclink {   */  struct tipc_node {  	u32 addr; +	struct kref kref;  	spinlock_t lock;  	struct net *net;  	struct hlist_node hash; @@ -125,7 +129,8 @@ struct tipc_node {  	struct tipc_node_bclink bclink;  	struct list_head list;  	int link_cnt; -	int working_links; +	u16 working_links; +	u16 capabilities;  	u32 signature;  	u32 link_id;  	struct list_head publ_list; @@ -134,6 +139,7 @@ struct tipc_node {  };  struct tipc_node *tipc_node_find(struct net *net, u32 addr); +void tipc_node_put(struct tipc_node *node);  struct tipc_node *tipc_node_create(struct net *net, u32 addr);  void tipc_node_stop(struct net *net);  void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); @@ -168,10 +174,12 @@ static inline uint tipc_node_get_mtu(struct net *net, u32 addr, u32 selector)  	node = tipc_node_find(net, addr); -	if (likely(node)) +	if (likely(node)) {  		mtu = node->act_mtus[selector & 1]; -	else +		tipc_node_put(node); +	} else {  		mtu = MAX_MSG_SIZE; +	}  	return mtu;  } diff --git a/net/tipc/server.c b/net/tipc/server.c index eadd4ed45905..ab6183cdb121 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -37,11 +37,13 @@  #include "core.h"  #include "socket.h"  #include <net/sock.h> +#include <linux/module.h>  /* Number of messages to send before rescheduling */  #define MAX_SEND_MSG_COUNT	25  #define MAX_RECV_MSG_COUNT	25  #define CF_CONNECTED		1 +#define CF_SERVER		2  #define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data) @@ -88,9 +90,19 @@ static void tipc_clean_outqueues(struct tipc_conn *con);  static void tipc_conn_kref_release(struct kref *kref)  {  	struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); +	struct sockaddr_tipc *saddr = con->server->saddr; +	struct socket *sock = con->sock; +	struct sock *sk; -	if (con->sock) { -		tipc_sock_release_local(con->sock); +	if (sock) { +		sk = sock->sk; +		if (test_bit(CF_SERVER, &con->flags)) { +			__module_get(sock->ops->owner); +			__module_get(sk->sk_prot_creator->owner); +		} +		saddr->scope = -TIPC_NODE_SCOPE; +		kernel_bind(sock, (struct sockaddr *)saddr, sizeof(*saddr)); +		sk_release_kernel(sk);  		con->sock = NULL;  	} @@ -281,7 +293,7 @@ static int tipc_accept_from_sock(struct tipc_conn *con)  	struct tipc_conn *newcon;  	int ret; -	ret = tipc_sock_accept_local(sock, &newsock, O_NONBLOCK); +	ret = kernel_accept(sock, &newsock, O_NONBLOCK);  	if (ret < 0)  		return ret; @@ -309,9 +321,12 @@ static struct socket *tipc_create_listen_sock(struct tipc_conn *con)  	struct socket *sock = NULL;  	int ret; -	ret = tipc_sock_create_local(s->net, s->type, &sock); +	ret = sock_create_kern(AF_TIPC, SOCK_SEQPACKET, 0, &sock);  	if (ret < 0)  		return NULL; + +	sk_change_net(sock->sk, s->net); +  	ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,  				(char *)&s->imp, sizeof(s->imp));  	if (ret < 0) @@ -337,11 +352,31 @@ static struct socket *tipc_create_listen_sock(struct tipc_conn *con)  		pr_err("Unknown socket type %d\n", s->type);  		goto create_err;  	} + +	/* As server's listening socket owner and creator is the same module, +	 * we have to decrease TIPC module reference count to guarantee that +	 * it remains zero after the server socket is created, otherwise, +	 * executing "rmmod" command is unable to make TIPC module deleted +	 * after TIPC module is inserted successfully. +	 * +	 * However, the reference count is ever increased twice in +	 * sock_create_kern(): one is to increase the reference count of owner +	 * of TIPC socket's proto_ops struct; another is to increment the +	 * reference count of owner of TIPC proto struct. Therefore, we must +	 * decrement the module reference count twice to ensure that it keeps +	 * zero after server's listening socket is created. Of course, we +	 * must bump the module reference count twice as well before the socket +	 * is closed. +	 */ +	module_put(sock->ops->owner); +	module_put(sock->sk->sk_prot_creator->owner); +	set_bit(CF_SERVER, &con->flags); +  	return sock;  create_err: -	sock_release(sock); -	con->sock = NULL; +	kernel_sock_shutdown(sock, SHUT_RDWR); +	sk_release_kernel(sock->sk);  	return NULL;  } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index b4d4467d0bb0..ee90d74d7516 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -35,7 +35,6 @@   */  #include <linux/rhashtable.h> -#include <linux/jhash.h>  #include "core.h"  #include "name_table.h"  #include "node.h" @@ -74,6 +73,7 @@   * @link_cong: non-zero if owner must sleep because of link congestion   * @sent_unacked: # messages sent by socket, and not yet acked by peer   * @rcv_unacked: # messages read by user, but not yet acked back to peer + * @remote: 'connected' peer for dgram/rdm   * @node: hash table node   * @rcu: rcu struct for tipc_sock   */ @@ -96,6 +96,7 @@ struct tipc_sock {  	bool link_cong;  	uint sent_unacked;  	uint rcv_unacked; +	struct sockaddr_tipc remote;  	struct rhash_head node;  	struct rcu_head rcu;  }; @@ -114,13 +115,14 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);  static int tipc_sk_insert(struct tipc_sock *tsk);  static void tipc_sk_remove(struct tipc_sock *tsk); +static int __tipc_send_stream(struct socket *sock, struct msghdr *m, +			      size_t dsz); +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);  static const struct proto_ops packet_ops;  static const struct proto_ops stream_ops;  static const struct proto_ops msg_ops; -  static struct proto tipc_proto; -static struct proto tipc_proto_kern;  static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {  	[TIPC_NLA_SOCK_UNSPEC]		= { .type = NLA_UNSPEC }, @@ -130,6 +132,8 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {  	[TIPC_NLA_SOCK_HAS_PUBL]	= { .type = NLA_FLAG }  }; +static const struct rhashtable_params tsk_rht_params; +  /*   * Revised TIPC socket locking policy:   * @@ -338,11 +342,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  	}  	/* Allocate socket's protocol area */ -	if (!kern) -		sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto); -	else -		sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern); - +	sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);  	if (sk == NULL)  		return -ENOMEM; @@ -380,75 +380,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock,  	return 0;  } -/** - * tipc_sock_create_local - create TIPC socket from inside TIPC module - * @type: socket type - SOCK_RDM or SOCK_SEQPACKET - * - * We cannot use sock_creat_kern here because it bumps module user count. - * Since socket owner and creator is the same module we must make sure - * that module count remains zero for module local sockets, otherwise - * we cannot do rmmod. - * - * Returns 0 on success, errno otherwise - */ -int tipc_sock_create_local(struct net *net, int type, struct socket **res) -{ -	int rc; - -	rc = sock_create_lite(AF_TIPC, type, 0, res); -	if (rc < 0) { -		pr_err("Failed to create kernel socket\n"); -		return rc; -	} -	tipc_sk_create(net, *res, 0, 1); - -	return 0; -} - -/** - * tipc_sock_release_local - release socket created by tipc_sock_create_local - * @sock: the socket to be released. - * - * Module reference count is not incremented when such sockets are created, - * so we must keep it from being decremented when they are released. - */ -void tipc_sock_release_local(struct socket *sock) -{ -	tipc_release(sock); -	sock->ops = NULL; -	sock_release(sock); -} - -/** - * tipc_sock_accept_local - accept a connection on a socket created - * with tipc_sock_create_local. Use this function to avoid that - * module reference count is inadvertently incremented. - * - * @sock:    the accepting socket - * @newsock: reference to the new socket to be created - * @flags:   socket flags - */ - -int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, -			   int flags) -{ -	struct sock *sk = sock->sk; -	int ret; - -	ret = sock_create_lite(sk->sk_family, sk->sk_type, -			       sk->sk_protocol, newsock); -	if (ret < 0) -		return ret; - -	ret = tipc_accept(sock, *newsock, flags); -	if (ret < 0) { -		sock_release(*newsock); -		return ret; -	} -	(*newsock)->ops = sock->ops; -	return ret; -} -  static void tipc_sk_callback(struct rcu_head *head)  {  	struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu); @@ -892,7 +823,6 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)  /**   * tipc_sendmsg - send message in connectionless manner - * @iocb: if NULL, indicates that socket lock is already held   * @sock: socket structure   * @m: message to send   * @dsz: amount of user data to be sent @@ -904,9 +834,21 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)   *   * Returns the number of bytes sent on success, or errno otherwise   */ -static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, +static int tipc_sendmsg(struct socket *sock,  			struct msghdr *m, size_t dsz)  { +	struct sock *sk = sock->sk; +	int ret; + +	lock_sock(sk); +	ret = __tipc_sendmsg(sock, m, dsz); +	release_sock(sk); + +	return ret; +} + +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) +{  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); @@ -915,49 +857,40 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,  	u32 dnode, dport;  	struct sk_buff_head *pktchain = &sk->sk_write_queue;  	struct sk_buff *skb; -	struct tipc_name_seq *seq = &dest->addr.nameseq; +	struct tipc_name_seq *seq;  	struct iov_iter save;  	u32 mtu;  	long timeo;  	int rc; -	if (unlikely(!dest)) -		return -EDESTADDRREQ; - -	if (unlikely((m->msg_namelen < sizeof(*dest)) || -		     (dest->family != AF_TIPC))) -		return -EINVAL; -  	if (dsz > TIPC_MAX_USER_MSG_SIZE)  		return -EMSGSIZE; - -	if (iocb) -		lock_sock(sk); - +	if (unlikely(!dest)) { +		if (tsk->connected && sock->state == SS_READY) +			dest = &tsk->remote; +		else +			return -EDESTADDRREQ; +	} else if (unlikely(m->msg_namelen < sizeof(*dest)) || +		   dest->family != AF_TIPC) { +		return -EINVAL; +	}  	if (unlikely(sock->state != SS_READY)) { -		if (sock->state == SS_LISTENING) { -			rc = -EPIPE; -			goto exit; -		} -		if (sock->state != SS_UNCONNECTED) { -			rc = -EISCONN; -			goto exit; -		} -		if (tsk->published) { -			rc = -EOPNOTSUPP; -			goto exit; -		} +		if (sock->state == SS_LISTENING) +			return -EPIPE; +		if (sock->state != SS_UNCONNECTED) +			return -EISCONN; +		if (tsk->published) +			return -EOPNOTSUPP;  		if (dest->addrtype == TIPC_ADDR_NAME) {  			tsk->conn_type = dest->addr.name.name.type;  			tsk->conn_instance = dest->addr.name.name.instance;  		}  	} - +	seq = &dest->addr.nameseq;  	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);  	if (dest->addrtype == TIPC_ADDR_MCAST) { -		rc = tipc_sendmcast(sock, seq, m, dsz, timeo); -		goto exit; +		return tipc_sendmcast(sock, seq, m, dsz, timeo);  	} else if (dest->addrtype == TIPC_ADDR_NAME) {  		u32 type = dest->addr.name.name.type;  		u32 inst = dest->addr.name.name.instance; @@ -972,10 +905,8 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,  		dport = tipc_nametbl_translate(net, type, inst, &dnode);  		msg_set_destnode(mhdr, dnode);  		msg_set_destport(mhdr, dport); -		if (unlikely(!dport && !dnode)) { -			rc = -EHOSTUNREACH; -			goto exit; -		} +		if (unlikely(!dport && !dnode)) +			return -EHOSTUNREACH;  	} else if (dest->addrtype == TIPC_ADDR_ID) {  		dnode = dest->addr.id.node;  		msg_set_type(mhdr, TIPC_DIRECT_MSG); @@ -990,7 +921,7 @@ new_mtu:  	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);  	rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);  	if (rc < 0) -		goto exit; +		return rc;  	do {  		skb = skb_peek(pktchain); @@ -1013,9 +944,6 @@ new_mtu:  		if (rc)  			__skb_queue_purge(pktchain);  	} while (!rc); -exit: -	if (iocb) -		release_sock(sk);  	return rc;  } @@ -1052,7 +980,6 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)  /**   * tipc_send_stream - send stream-oriented data - * @iocb: (unused)   * @sock: socket structure   * @m: data to send   * @dsz: total length of data to be transmitted @@ -1062,8 +989,19 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)   * Returns the number of bytes sent on success (or partial success),   * or errno if no data sent   */ -static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, -			    struct msghdr *m, size_t dsz) +static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +{ +	struct sock *sk = sock->sk; +	int ret; + +	lock_sock(sk); +	ret = __tipc_send_stream(sock, m, dsz); +	release_sock(sk); + +	return ret; +} + +static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)  {  	struct sock *sk = sock->sk;  	struct net *net = sock_net(sk); @@ -1080,7 +1018,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,  	/* Handle implied connection establishment */  	if (unlikely(dest)) { -		rc = tipc_sendmsg(iocb, sock, m, dsz); +		rc = __tipc_sendmsg(sock, m, dsz);  		if (dsz && (dsz == rc))  			tsk->sent_unacked = 1;  		return rc; @@ -1088,15 +1026,11 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,  	if (dsz > (uint)INT_MAX)  		return -EMSGSIZE; -	if (iocb) -		lock_sock(sk); -  	if (unlikely(sock->state != SS_CONNECTED)) {  		if (sock->state == SS_DISCONNECTING) -			rc = -EPIPE; +			return -EPIPE;  		else -			rc = -ENOTCONN; -		goto exit; +			return -ENOTCONN;  	}  	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); @@ -1108,7 +1042,7 @@ next:  	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);  	rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);  	if (unlikely(rc < 0)) -		goto exit; +		return rc;  	do {  		if (likely(!tsk_conn_cong(tsk))) {  			rc = tipc_link_xmit(net, pktchain, dnode, portid); @@ -1133,15 +1067,12 @@ next:  		if (rc)  			__skb_queue_purge(pktchain);  	} while (!rc); -exit: -	if (iocb) -		release_sock(sk); +  	return sent ? sent : rc;  }  /**   * tipc_send_packet - send a connection-oriented message - * @iocb: if NULL, indicates that socket lock is already held   * @sock: socket structure   * @m: message to send   * @dsz: length of data to be transmitted @@ -1150,13 +1081,12 @@ exit:   *   * Returns the number of bytes sent on success, or errno otherwise   */ -static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, -			    struct msghdr *m, size_t dsz) +static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)  {  	if (dsz > TIPC_MAX_USER_MSG_SIZE)  		return -EMSGSIZE; -	return tipc_send_stream(iocb, sock, m, dsz); +	return tipc_send_stream(sock, m, dsz);  }  /* tipc_sk_finish_conn - complete the setup of a connection @@ -1317,12 +1247,12 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)  		err = 0;  		if (!skb_queue_empty(&sk->sk_receive_queue))  			break; -		err = sock_intr_errno(timeo); -		if (signal_pending(current)) -			break;  		err = -EAGAIN;  		if (!timeo)  			break; +		err = sock_intr_errno(timeo); +		if (signal_pending(current)) +			break;  	}  	finish_wait(sk_sleep(sk), &wait);  	*timeop = timeo; @@ -1331,7 +1261,6 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)  /**   * tipc_recvmsg - receive packet-oriented message - * @iocb: (unused)   * @m: descriptor for message info   * @buf_len: total size of user buffer area   * @flags: receive flags @@ -1341,8 +1270,8 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)   *   * Returns size of returned message data, errno otherwise   */ -static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, -			struct msghdr *m, size_t buf_len, int flags) +static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len, +			int flags)  {  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); @@ -1426,7 +1355,6 @@ exit:  /**   * tipc_recv_stream - receive stream-oriented data - * @iocb: (unused)   * @m: descriptor for message info   * @buf_len: total size of user buffer area   * @flags: receive flags @@ -1436,8 +1364,8 @@ exit:   *   * Returns size of returned message data, errno otherwise   */ -static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, -			    struct msghdr *m, size_t buf_len, int flags) +static int tipc_recv_stream(struct socket *sock, struct msghdr *m, +			    size_t buf_len, int flags)  {  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); @@ -1909,17 +1837,26 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,  			int destlen, int flags)  {  	struct sock *sk = sock->sk; +	struct tipc_sock *tsk = tipc_sk(sk);  	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;  	struct msghdr m = {NULL,}; -	long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout; +	long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;  	socket_state previous; -	int res; +	int res = 0;  	lock_sock(sk); -	/* For now, TIPC does not allow use of connect() with DGRAM/RDM types */ +	/* DGRAM/RDM connect(), just save the destaddr */  	if (sock->state == SS_READY) { -		res = -EOPNOTSUPP; +		if (dst->family == AF_UNSPEC) { +			memset(&tsk->remote, 0, sizeof(struct sockaddr_tipc)); +			tsk->connected = 0; +		} else if (destlen != sizeof(struct sockaddr_tipc)) { +			res = -EINVAL; +		} else { +			memcpy(&tsk->remote, dest, destlen); +			tsk->connected = 1; +		}  		goto exit;  	} @@ -1947,7 +1884,7 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,  		if (!timeout)  			m.msg_flags = MSG_DONTWAIT; -		res = tipc_sendmsg(NULL, sock, &m, 0); +		res = __tipc_sendmsg(sock, &m, 0);  		if ((res < 0) && (res != -EWOULDBLOCK))  			goto exit; @@ -2027,12 +1964,12 @@ static int tipc_wait_for_accept(struct socket *sock, long timeo)  		err = -EINVAL;  		if (sock->state != SS_LISTENING)  			break; -		err = sock_intr_errno(timeo); -		if (signal_pending(current)) -			break;  		err = -EAGAIN;  		if (!timeo)  			break; +		err = sock_intr_errno(timeo); +		if (signal_pending(current)) +			break;  	}  	finish_wait(sk_sleep(sk), &wait);  	return err; @@ -2103,7 +2040,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)  		struct msghdr m = {NULL,};  		tsk_advance_rx_queue(sk); -		tipc_send_packet(NULL, new_sock, &m, 0); +		__tipc_send_stream(new_sock, &m, 0);  	} else {  		__skb_dequeue(&sk->sk_receive_queue);  		__skb_queue_head(&new_sk->sk_receive_queue, buf); @@ -2154,7 +2091,6 @@ restart:  					     TIPC_CONN_SHUTDOWN))  				tipc_link_xmit_skb(net, skb, dnode,  						   tsk->portid); -			tipc_node_remove_conn(net, dnode, tsk->portid);  		} else {  			dnode = tsk_peer_node(tsk); @@ -2312,7 +2248,7 @@ static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)  	struct tipc_sock *tsk;  	rcu_read_lock(); -	tsk = rhashtable_lookup(&tn->sk_rht, &portid); +	tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);  	if (tsk)  		sock_hold(&tsk->sk);  	rcu_read_unlock(); @@ -2334,7 +2270,8 @@ static int tipc_sk_insert(struct tipc_sock *tsk)  			portid = TIPC_MIN_PORT;  		tsk->portid = portid;  		sock_hold(&tsk->sk); -		if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node)) +		if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node, +						   tsk_rht_params))  			return 0;  		sock_put(&tsk->sk);  	} @@ -2347,26 +2284,27 @@ static void tipc_sk_remove(struct tipc_sock *tsk)  	struct sock *sk = &tsk->sk;  	struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id); -	if (rhashtable_remove(&tn->sk_rht, &tsk->node)) { +	if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {  		WARN_ON(atomic_read(&sk->sk_refcnt) == 1);  		__sock_put(sk);  	}  } +static const struct rhashtable_params tsk_rht_params = { +	.nelem_hint = 192, +	.head_offset = offsetof(struct tipc_sock, node), +	.key_offset = offsetof(struct tipc_sock, portid), +	.key_len = sizeof(u32), /* portid */ +	.max_size = 1048576, +	.min_size = 256, +	.automatic_shrinking = true, +}; +  int tipc_sk_rht_init(struct net *net)  {  	struct tipc_net *tn = net_generic(net, tipc_net_id); -	struct rhashtable_params rht_params = { -		.nelem_hint = 192, -		.head_offset = offsetof(struct tipc_sock, node), -		.key_offset = offsetof(struct tipc_sock, portid), -		.key_len = sizeof(u32), /* portid */ -		.hashfn = jhash, -		.max_shift = 20, /* 1M */ -		.min_shift = 8,  /* 256 */ -	}; -	return rhashtable_init(&tn->sk_rht, &rht_params); +	return rhashtable_init(&tn->sk_rht, &tsk_rht_params);  }  void tipc_sk_rht_destroy(struct net *net) @@ -2609,12 +2547,6 @@ static struct proto tipc_proto = {  	.sysctl_rmem	= sysctl_tipc_rmem  }; -static struct proto tipc_proto_kern = { -	.name		= "TIPC", -	.obj_size	= sizeof(struct tipc_sock), -	.sysctl_rmem	= sysctl_tipc_rmem -}; -  /**   * tipc_socket_init - initialize TIPC socket interface   * diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 238f1b7bd9bd..bf6551389522 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -44,10 +44,6 @@  				  SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))  int tipc_socket_init(void);  void tipc_socket_stop(void); -int tipc_sock_create_local(struct net *net, int type, struct socket **res); -void tipc_sock_release_local(struct socket *sock); -int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, -			   int flags);  int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);  void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  		       struct sk_buff_head *inputq); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 72c339e432aa..1c147c869c2e 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -162,19 +162,6 @@ static void subscr_del(struct tipc_subscription *sub)  	atomic_dec(&tn->subscription_count);  } -/** - * subscr_terminate - terminate communication with a subscriber - * - * Note: Must call it in process context since it might sleep. - */ -static void subscr_terminate(struct tipc_subscription *sub) -{ -	struct tipc_subscriber *subscriber = sub->subscriber; -	struct tipc_net *tn = net_generic(sub->net, tipc_net_id); - -	tipc_conn_terminate(tn->topsrv, subscriber->conid); -} -  static void subscr_release(struct tipc_subscriber *subscriber)  {  	struct tipc_subscription *sub; @@ -312,16 +299,14 @@ static void subscr_conn_msg_event(struct net *net, int conid,  {  	struct tipc_subscriber *subscriber = usr_data;  	struct tipc_subscription *sub = NULL; +	struct tipc_net *tn = net_generic(net, tipc_net_id);  	spin_lock_bh(&subscriber->lock); -	if (subscr_subscribe(net, (struct tipc_subscr *)buf, subscriber, -			     &sub) < 0) { -		spin_unlock_bh(&subscriber->lock); -		subscr_terminate(sub); -		return; -	} +	subscr_subscribe(net, (struct tipc_subscr *)buf, subscriber, &sub);  	if (sub)  		tipc_nametbl_subscribe(sub); +	else +		tipc_conn_terminate(tn->topsrv, subscriber->conid);  	spin_unlock_bh(&subscriber->lock);  } diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c new file mode 100644 index 000000000000..66deebc66aa1 --- /dev/null +++ b/net/tipc/udp_media.c @@ -0,0 +1,448 @@ +/* net/tipc/udp_media.c: IP bearer support for TIPC + * + * Copyright (c) 2015, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + *    notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + *    notice, this list of conditions and the following disclaimer in the + *    documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + *    contributors may be used to endorse or promote products derived from + *    this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <linux/socket.h> +#include <linux/ip.h> +#include <linux/udp.h> +#include <linux/inet.h> +#include <linux/inetdevice.h> +#include <linux/igmp.h> +#include <linux/kernel.h> +#include <linux/workqueue.h> +#include <linux/list.h> +#include <net/sock.h> +#include <net/ip.h> +#include <net/udp_tunnel.h> +#include <net/addrconf.h> +#include <linux/tipc_netlink.h> +#include "core.h" +#include "bearer.h" + +/* IANA assigned UDP port */ +#define UDP_PORT_DEFAULT	6118 + +static const struct nla_policy tipc_nl_udp_policy[TIPC_NLA_UDP_MAX + 1] = { +	[TIPC_NLA_UDP_UNSPEC]	= {.type = NLA_UNSPEC}, +	[TIPC_NLA_UDP_LOCAL]	= {.type = NLA_BINARY, +				   .len = sizeof(struct sockaddr_storage)}, +	[TIPC_NLA_UDP_REMOTE]	= {.type = NLA_BINARY, +				   .len = sizeof(struct sockaddr_storage)}, +}; + +/** + * struct udp_media_addr - IP/UDP addressing information + * + * This is the bearer level originating address used in neighbor discovery + * messages, and all fields should be in network byte order + */ +struct udp_media_addr { +	__be16	proto; +	__be16	udp_port; +	union { +		struct in_addr ipv4; +		struct in6_addr ipv6; +	}; +}; + +/** + * struct udp_bearer - ip/udp bearer data structure + * @bearer:	associated generic tipc bearer + * @ubsock:	bearer associated socket + * @ifindex:	local address scope + * @work:	used to schedule deferred work on a bearer + */ +struct udp_bearer { +	struct tipc_bearer __rcu *bearer; +	struct socket *ubsock; +	u32 ifindex; +	struct work_struct work; +}; + +/* udp_media_addr_set - convert a ip/udp address to a TIPC media address */ +static void tipc_udp_media_addr_set(struct tipc_media_addr *addr, +				    struct udp_media_addr *ua) +{ +	memset(addr, 0, sizeof(struct tipc_media_addr)); +	addr->media_id = TIPC_MEDIA_TYPE_UDP; +	memcpy(addr->value, ua, sizeof(struct udp_media_addr)); +	if (ntohs(ua->proto) == ETH_P_IP) { +		if (ipv4_is_multicast(ua->ipv4.s_addr)) +			addr->broadcast = 1; +	} else if (ntohs(ua->proto) == ETH_P_IPV6) { +		if (ipv6_addr_type(&ua->ipv6) & IPV6_ADDR_MULTICAST) +			addr->broadcast = 1; +	} else { +		pr_err("Invalid UDP media address\n"); +	} +} + +/* tipc_udp_addr2str - convert ip/udp address to string */ +static int tipc_udp_addr2str(struct tipc_media_addr *a, char *buf, int size) +{ +	struct udp_media_addr *ua = (struct udp_media_addr *)&a->value; + +	if (ntohs(ua->proto) == ETH_P_IP) +		snprintf(buf, size, "%pI4:%u", &ua->ipv4, ntohs(ua->udp_port)); +	else if (ntohs(ua->proto) == ETH_P_IPV6) +		snprintf(buf, size, "%pI6:%u", &ua->ipv6, ntohs(ua->udp_port)); +	else +		pr_err("Invalid UDP media address\n"); +	return 0; +} + +/* tipc_udp_msg2addr - extract an ip/udp address from a TIPC ndisc message */ +static int tipc_udp_msg2addr(struct tipc_bearer *b, struct tipc_media_addr *a, +			     char *msg) +{ +	struct udp_media_addr *ua; + +	ua = (struct udp_media_addr *) (msg + TIPC_MEDIA_ADDR_OFFSET); +	if (msg[TIPC_MEDIA_TYPE_OFFSET] != TIPC_MEDIA_TYPE_UDP) +		return -EINVAL; +	tipc_udp_media_addr_set(a, ua); +	return 0; +} + +/* tipc_udp_addr2msg - write an ip/udp address to a TIPC ndisc message */ +static int tipc_udp_addr2msg(char *msg, struct tipc_media_addr *a) +{ +	memset(msg, 0, TIPC_MEDIA_INFO_SIZE); +	msg[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_UDP; +	memcpy(msg + TIPC_MEDIA_ADDR_OFFSET, a->value, +	       sizeof(struct udp_media_addr)); +	return 0; +} + +/* tipc_send_msg - enqueue a send request */ +static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, +			     struct tipc_bearer *b, +			     struct tipc_media_addr *dest) +{ +	int ttl, err = 0; +	struct udp_bearer *ub; +	struct udp_media_addr *dst = (struct udp_media_addr *)&dest->value; +	struct udp_media_addr *src = (struct udp_media_addr *)&b->addr.value; +	struct sk_buff *clone; +	struct rtable *rt; + +	clone = skb_clone(skb, GFP_ATOMIC); +	skb_set_inner_protocol(clone, htons(ETH_P_TIPC)); +	ub = rcu_dereference_rtnl(b->media_ptr); +	if (!ub) { +		err = -ENODEV; +		goto tx_error; +	} +	if (dst->proto == htons(ETH_P_IP)) { +		struct flowi4 fl = { +			.daddr = dst->ipv4.s_addr, +			.saddr = src->ipv4.s_addr, +			.flowi4_mark = clone->mark, +			.flowi4_proto = IPPROTO_UDP +		}; +		rt = ip_route_output_key(net, &fl); +		if (IS_ERR(rt)) { +			err = PTR_ERR(rt); +			goto tx_error; +		} +		ttl = ip4_dst_hoplimit(&rt->dst); +		err = udp_tunnel_xmit_skb(rt, ub->ubsock->sk, clone, +					  src->ipv4.s_addr, +					  dst->ipv4.s_addr, 0, ttl, 0, +					  src->udp_port, dst->udp_port, +					  false, true); +		if (err < 0) { +			ip_rt_put(rt); +			goto tx_error; +		} +#if IS_ENABLED(CONFIG_IPV6) +	} else { +		struct dst_entry *ndst; +		struct flowi6 fl6 = { +			.flowi6_oif = ub->ifindex, +			.daddr = dst->ipv6, +			.saddr = src->ipv6, +			.flowi6_proto = IPPROTO_UDP +		}; +		err = ipv6_stub->ipv6_dst_lookup(ub->ubsock->sk, &ndst, &fl6); +		if (err) +			goto tx_error; +		ttl = ip6_dst_hoplimit(ndst); +		err = udp_tunnel6_xmit_skb(ndst, ub->ubsock->sk, clone, +					   ndst->dev, &src->ipv6, +					   &dst->ipv6, 0, ttl, src->udp_port, +					   dst->udp_port, false); +#endif +	} +	return err; + +tx_error: +	kfree_skb(clone); +	return err; +} + +/* tipc_udp_recv - read data from bearer socket */ +static int tipc_udp_recv(struct sock *sk, struct sk_buff *skb) +{ +	struct udp_bearer *ub; +	struct tipc_bearer *b; + +	ub = rcu_dereference_sk_user_data(sk); +	if (!ub) { +		pr_err_ratelimited("Failed to get UDP bearer reference"); +		kfree_skb(skb); +		return 0; +	} + +	skb_pull(skb, sizeof(struct udphdr)); +	rcu_read_lock(); +	b = rcu_dereference_rtnl(ub->bearer); + +	if (b) { +		tipc_rcv(sock_net(sk), skb, b); +		rcu_read_unlock(); +		return 0; +	} +	rcu_read_unlock(); +	kfree_skb(skb); +	return 0; +} + +static int enable_mcast(struct udp_bearer *ub, struct udp_media_addr *remote) +{ +	int err = 0; +	struct ip_mreqn mreqn; +	struct sock *sk = ub->ubsock->sk; + +	if (ntohs(remote->proto) == ETH_P_IP) { +		if (!ipv4_is_multicast(remote->ipv4.s_addr)) +			return 0; +		mreqn.imr_multiaddr = remote->ipv4; +		mreqn.imr_ifindex = ub->ifindex; +		err = ip_mc_join_group(sk, &mreqn); +#if IS_ENABLED(CONFIG_IPV6) +	} else { +		if (!ipv6_addr_is_multicast(&remote->ipv6)) +			return 0; +		err = ipv6_stub->ipv6_sock_mc_join(sk, ub->ifindex, +						   &remote->ipv6); +#endif +	} +	return err; +} + +/** + * parse_options - build local/remote addresses from configuration + * @attrs:	netlink config data + * @ub:		UDP bearer instance + * @local:	local bearer IP address/port + * @remote:	peer or multicast IP/port + */ +static int parse_options(struct nlattr *attrs[], struct udp_bearer *ub, +			 struct udp_media_addr *local, +			 struct udp_media_addr *remote) +{ +	struct nlattr *opts[TIPC_NLA_UDP_MAX + 1]; +	struct sockaddr_storage *sa_local, *sa_remote; + +	if (!attrs[TIPC_NLA_BEARER_UDP_OPTS]) +		goto err; +	if (nla_parse_nested(opts, TIPC_NLA_UDP_MAX, +			     attrs[TIPC_NLA_BEARER_UDP_OPTS], +			     tipc_nl_udp_policy)) +		goto err; +	if (opts[TIPC_NLA_UDP_LOCAL] && opts[TIPC_NLA_UDP_REMOTE]) { +		sa_local = nla_data(opts[TIPC_NLA_UDP_LOCAL]); +		sa_remote = nla_data(opts[TIPC_NLA_UDP_REMOTE]); +	} else { +err: +		pr_err("Invalid UDP bearer configuration"); +		return -EINVAL; +	} +	if ((sa_local->ss_family & sa_remote->ss_family) == AF_INET) { +		struct sockaddr_in *ip4; + +		ip4 = (struct sockaddr_in *)sa_local; +		local->proto = htons(ETH_P_IP); +		local->udp_port = ip4->sin_port; +		local->ipv4.s_addr = ip4->sin_addr.s_addr; + +		ip4 = (struct sockaddr_in *)sa_remote; +		remote->proto = htons(ETH_P_IP); +		remote->udp_port = ip4->sin_port; +		remote->ipv4.s_addr = ip4->sin_addr.s_addr; +		return 0; + +#if IS_ENABLED(CONFIG_IPV6) +	} else if ((sa_local->ss_family & sa_remote->ss_family) == AF_INET6) { +		struct sockaddr_in6 *ip6; + +		ip6 = (struct sockaddr_in6 *)sa_local; +		local->proto = htons(ETH_P_IPV6); +		local->udp_port = ip6->sin6_port; +		local->ipv6 = ip6->sin6_addr; +		ub->ifindex = ip6->sin6_scope_id; + +		ip6 = (struct sockaddr_in6 *)sa_remote; +		remote->proto = htons(ETH_P_IPV6); +		remote->udp_port = ip6->sin6_port; +		remote->ipv6 = ip6->sin6_addr; +		return 0; +#endif +	} +	return -EADDRNOTAVAIL; +} + +/** + * tipc_udp_enable - callback to create a new udp bearer instance + * @net:	network namespace + * @b:		pointer to generic tipc_bearer + * @attrs:	netlink bearer configuration + * + * validate the bearer parameters and initialize the udp bearer + * rtnl_lock should be held + */ +static int tipc_udp_enable(struct net *net, struct tipc_bearer *b, +			   struct nlattr *attrs[]) +{ +	int err = -EINVAL; +	struct udp_bearer *ub; +	struct udp_media_addr *remote; +	struct udp_media_addr local = {0}; +	struct udp_port_cfg udp_conf = {0}; +	struct udp_tunnel_sock_cfg tuncfg = {NULL}; + +	ub = kzalloc(sizeof(*ub), GFP_ATOMIC); +	if (!ub) +		return -ENOMEM; + +	remote = (struct udp_media_addr *)&b->bcast_addr.value; +	memset(remote, 0, sizeof(struct udp_media_addr)); +	err = parse_options(attrs, ub, &local, remote); +	if (err) +		goto err; + +	b->bcast_addr.media_id = TIPC_MEDIA_TYPE_UDP; +	b->bcast_addr.broadcast = 1; +	rcu_assign_pointer(b->media_ptr, ub); +	rcu_assign_pointer(ub->bearer, b); +	tipc_udp_media_addr_set(&b->addr, &local); +	if (local.proto == htons(ETH_P_IP)) { +		struct net_device *dev; + +		dev = __ip_dev_find(net, local.ipv4.s_addr, false); +		if (!dev) { +			err = -ENODEV; +			goto err; +		} +		udp_conf.family = AF_INET; +		udp_conf.local_ip.s_addr = htonl(INADDR_ANY); +		udp_conf.use_udp_checksums = false; +		ub->ifindex = dev->ifindex; +		b->mtu = dev->mtu - sizeof(struct iphdr) +			- sizeof(struct udphdr); +#if IS_ENABLED(CONFIG_IPV6) +	} else if (local.proto == htons(ETH_P_IPV6)) { +		udp_conf.family = AF_INET6; +		udp_conf.use_udp6_tx_checksums = true; +		udp_conf.use_udp6_rx_checksums = true; +		udp_conf.local_ip6 = in6addr_any; +		b->mtu = 1280; +#endif +	} else { +		err = -EAFNOSUPPORT; +		goto err; +	} +	udp_conf.local_udp_port = local.udp_port; +	err = udp_sock_create(net, &udp_conf, &ub->ubsock); +	if (err) +		goto err; +	tuncfg.sk_user_data = ub; +	tuncfg.encap_type = 1; +	tuncfg.encap_rcv = tipc_udp_recv; +	tuncfg.encap_destroy = NULL; +	setup_udp_tunnel_sock(net, ub->ubsock, &tuncfg); + +	if (enable_mcast(ub, remote)) +		goto err; +	return 0; +err: +	kfree(ub); +	return err; +} + +/* cleanup_bearer - break the socket/bearer association */ +static void cleanup_bearer(struct work_struct *work) +{ +	struct udp_bearer *ub = container_of(work, struct udp_bearer, work); + +	if (ub->ubsock) +		udp_tunnel_sock_release(ub->ubsock); +	synchronize_net(); +	kfree(ub); +} + +/* tipc_udp_disable - detach bearer from socket */ +static void tipc_udp_disable(struct tipc_bearer *b) +{ +	struct udp_bearer *ub; + +	ub = rcu_dereference_rtnl(b->media_ptr); +	if (!ub) { +		pr_err("UDP bearer instance not found\n"); +		return; +	} +	if (ub->ubsock) +		sock_set_flag(ub->ubsock->sk, SOCK_DEAD); +	RCU_INIT_POINTER(b->media_ptr, NULL); +	RCU_INIT_POINTER(ub->bearer, NULL); + +	/* sock_release need to be done outside of rtnl lock */ +	INIT_WORK(&ub->work, cleanup_bearer); +	schedule_work(&ub->work); +} + +struct tipc_media udp_media_info = { +	.send_msg	= tipc_udp_send_msg, +	.enable_media	= tipc_udp_enable, +	.disable_media	= tipc_udp_disable, +	.addr2str	= tipc_udp_addr2str, +	.addr2msg	= tipc_udp_addr2msg, +	.msg2addr	= tipc_udp_msg2addr, +	.priority	= TIPC_DEF_LINK_PRI, +	.tolerance	= TIPC_DEF_LINK_TOL, +	.window		= TIPC_DEF_LINK_WIN, +	.type_id	= TIPC_MEDIA_TYPE_UDP, +	.hwaddr_len	= 0, +	.name		= "udp" +}; |