diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/bcast.c | 61 | ||||
| -rw-r--r-- | net/tipc/bcast.h | 1 | ||||
| -rw-r--r-- | net/tipc/bearer.c | 30 | ||||
| -rw-r--r-- | net/tipc/bearer.h | 3 | ||||
| -rw-r--r-- | net/tipc/core.h | 10 | ||||
| -rw-r--r-- | net/tipc/discover.c | 130 | ||||
| -rw-r--r-- | net/tipc/link.c | 2031 | ||||
| -rw-r--r-- | net/tipc/link.h | 109 | ||||
| -rw-r--r-- | net/tipc/msg.c | 86 | ||||
| -rw-r--r-- | net/tipc/msg.h | 112 | ||||
| -rw-r--r-- | net/tipc/name_distr.c | 6 | ||||
| -rw-r--r-- | net/tipc/netlink_compat.c | 2 | ||||
| -rw-r--r-- | net/tipc/node.c | 979 | ||||
| -rw-r--r-- | net/tipc/node.h | 84 | ||||
| -rw-r--r-- | net/tipc/socket.c | 386 | ||||
| -rw-r--r-- | net/tipc/socket.h | 2 | ||||
| -rw-r--r-- | net/tipc/udp_media.c | 3 | 
17 files changed, 2246 insertions, 1789 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index a816382fc8af..41042de3ae9b 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -170,6 +170,30 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)  }  /** + * bclink_prepare_wakeup - prepare users for wakeup after congestion + * @bcl: broadcast link + * @resultq: queue for users which can be woken up + * Move a number of waiting users, as permitted by available space in + * the send queue, from link wait queue to specified queue for wakeup + */ +static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq) +{ +	int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; +	int imp, lim; +	struct sk_buff *skb, *tmp; + +	skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) { +		imp = TIPC_SKB_CB(skb)->chain_imp; +		lim = bcl->window + bcl->backlog[imp].limit; +		pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; +		if ((pnd[imp] + bcl->backlog[imp].len) >= lim) +			continue; +		skb_unlink(skb, &bcl->wakeupq); +		skb_queue_tail(resultq, skb); +	} +} + +/**   * tipc_bclink_wakeup_users - wake up pending users   *   * Called with no locks taken @@ -177,8 +201,12 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)  void tipc_bclink_wakeup_users(struct net *net)  {  	struct tipc_net *tn = net_generic(net, tipc_net_id); +	struct tipc_link *bcl = tn->bcl; +	struct sk_buff_head resultq; -	tipc_sk_rcv(net, &tn->bclink->link.wakeupq); +	skb_queue_head_init(&resultq); +	bclink_prepare_wakeup(bcl, &resultq); +	tipc_sk_rcv(net, &resultq);  }  /** @@ -316,6 +344,29 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,  	}  } +void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr) +{ +	u16 last = msg_last_bcast(hdr); +	int mtyp = msg_type(hdr); + +	if (unlikely(msg_user(hdr) != LINK_PROTOCOL)) +		return; +	if (mtyp == STATE_MSG) { +		tipc_bclink_update_link_state(n, last); +		return; +	} +	/* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization, +	 * and transfer synch info in LINK_PROTOCOL messages. +	 */ +	if (tipc_node_is_up(n)) +		return; +	if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG)) +		return; +	n->bclink.last_sent = last; +	n->bclink.last_in = last; +	n->bclink.oos_state = 0; +} +  /**   * bclink_peek_nack - monitor retransmission requests sent by other nodes   * @@ -358,10 +409,9 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)  	/* Prepare clone of message for local node */  	skb = tipc_msg_reassemble(list); -	if (unlikely(!skb)) { -		__skb_queue_purge(list); +	if (unlikely(!skb))  		return -EHOSTUNREACH; -	} +  	/* Broadcast to all nodes */  	if (likely(bclink)) {  		tipc_bclink_lock(net); @@ -413,7 +463,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)  	 * all nodes in the cluster don't ACK at the same time  	 */  	if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { -		tipc_link_proto_xmit(node->active_links[node->addr & 1], +		tipc_link_proto_xmit(node_active_link(node, node->addr),  				     STATE_MSG, 0, 0, 0, 0);  		tn->bcl->stats.sent_acks++;  	} @@ -925,7 +975,6 @@ int tipc_bclink_init(struct net *net)  	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);  	bcl->bearer_id = MAX_BEARERS;  	rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); -	bcl->state = WORKING_WORKING;  	bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;  	msg_set_prevnode(bcl->pmsg, tn->own_addr);  	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 3c290a48f720..d74c69bcf60b 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -133,5 +133,6 @@ void tipc_bclink_wakeup_users(struct net *net);  int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);  int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);  void tipc_bclink_input(struct net *net); +void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);  #endif diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 00bc0e620532..ce9f7bfc0b92 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -343,7 +343,7 @@ restart:  static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b_ptr)  {  	pr_info("Resetting bearer <%s>\n", b_ptr->name); -	tipc_link_delete_list(net, b_ptr->identity); +	tipc_node_delete_links(net, b_ptr->identity);  	tipc_disc_reset(net, b_ptr);  	return 0;  } @@ -361,7 +361,7 @@ static void bearer_disable(struct net *net, struct tipc_bearer *b_ptr)  	pr_info("Disabling bearer <%s>\n", b_ptr->name);  	b_ptr->media->disable_media(b_ptr); -	tipc_link_delete_list(net, b_ptr->identity); +	tipc_node_delete_links(net, b_ptr->identity);  	if (b_ptr->link_req)  		tipc_disc_delete(b_ptr->link_req); @@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,  	rcu_read_unlock();  } +/* tipc_bearer_xmit() -send buffer to destination over bearer + */ +void tipc_bearer_xmit(struct net *net, u32 bearer_id, +		      struct sk_buff_head *xmitq, +		      struct tipc_media_addr *dst) +{ +	struct tipc_net *tn = net_generic(net, tipc_net_id); +	struct tipc_bearer *b; +	struct sk_buff *skb, *tmp; + +	if (skb_queue_empty(xmitq)) +		return; + +	rcu_read_lock(); +	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); +	if (likely(b)) { +		skb_queue_walk_safe(xmitq, skb, tmp) { +			__skb_dequeue(xmitq); +			b->media->send_msg(net, skb, b, dst); +			/* Until we remove cloning in tipc_l2_send_msg(): */ +			kfree_skb(skb); +		} +	} +	rcu_read_unlock(); +} +  /**   * tipc_l2_rcv_msg - handle incoming TIPC message from an interface   * @buf: the received packet diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index dc714d977768..6426f242f626 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);  void tipc_bearer_stop(struct net *net);  void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,  		      struct tipc_media_addr *dest); +void tipc_bearer_xmit(struct net *net, u32 bearer_id, +		      struct sk_buff_head *xmitq, +		      struct tipc_media_addr *dst);  #endif	/* _TIPC_BEARER_H */ diff --git a/net/tipc/core.h b/net/tipc/core.h index 0fcf133d5cb7..b96b41eabf12 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -109,6 +109,11 @@ struct tipc_net {  	atomic_t subscription_count;  }; +static inline struct tipc_net *tipc_net(struct net *net) +{ +	return net_generic(net, tipc_net_id); +} +  static inline u16 mod(u16 x)  {  	return x & 0xffffu; @@ -129,6 +134,11 @@ static inline int less(u16 left, u16 right)  	return less_eq(left, right) && (mod(right) != mod(left));  } +static inline int in_range(u16 val, u16 min, u16 max) +{ +	return !less(val, min) && !more(val, max); +} +  #ifdef CONFIG_SYSCTL  int tipc_register_sysctl(void);  void tipc_unregister_sysctl(void); diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 967e292f53c8..d14e0a4aa9af 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -35,7 +35,7 @@   */  #include "core.h" -#include "link.h" +#include "node.h"  #include "discover.h"  /* min delay during bearer start up */ @@ -120,30 +120,24 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,   * @buf: buffer containing message   * @bearer: bearer that message arrived on   */ -void tipc_disc_rcv(struct net *net, struct sk_buff *buf, +void tipc_disc_rcv(struct net *net, struct sk_buff *skb,  		   struct tipc_bearer *bearer)  {  	struct tipc_net *tn = net_generic(net, tipc_net_id); -	struct tipc_node *node; -	struct tipc_link *link;  	struct tipc_media_addr maddr; -	struct sk_buff *rbuf; -	struct tipc_msg *msg = buf_msg(buf); -	u32 ddom = msg_dest_domain(msg); -	u32 onode = msg_prevnode(msg); -	u32 net_id = msg_bc_netid(msg); -	u32 mtyp = msg_type(msg); -	u32 signature = msg_node_sig(msg); -	u16 caps = msg_node_capabilities(msg); -	bool addr_match = false; -	bool sign_match = false; -	bool link_up = false; -	bool accept_addr = false; -	bool accept_sign = false; +	struct sk_buff *rskb; +	struct tipc_msg *hdr = buf_msg(skb); +	u32 ddom = msg_dest_domain(hdr); +	u32 onode = msg_prevnode(hdr); +	u32 net_id = msg_bc_netid(hdr); +	u32 mtyp = msg_type(hdr); +	u32 signature = msg_node_sig(hdr); +	u16 caps = msg_node_capabilities(hdr);  	bool respond = false; +	bool dupl_addr = false; -	bearer->media->msg2addr(bearer, &maddr, msg_media_addr(msg)); -	kfree_skb(buf); +	bearer->media->msg2addr(bearer, &maddr, msg_media_addr(hdr)); +	kfree_skb(skb);  	/* Ensure message from node is valid and communication is permitted */  	if (net_id != tn->net_id) @@ -165,102 +159,20 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,  	if (!tipc_in_scope(bearer->domain, onode))  		return; -	node = tipc_node_create(net, onode); -	if (!node) -		return; -	tipc_node_lock(node); -	node->capabilities = caps; -	link = node->links[bearer->identity]; - -	/* Prepare to validate requesting node's signature and media address */ -	sign_match = (signature == node->signature); -	addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr)); -	link_up = link && tipc_link_is_up(link); - - -	/* These three flags give us eight permutations: */ - -	if (sign_match && addr_match && link_up) { -		/* All is fine. Do nothing. */ -	} else if (sign_match && addr_match && !link_up) { -		/* Respond. The link will come up in due time */ -		respond = true; -	} else if (sign_match && !addr_match && link_up) { -		/* Peer has changed i/f address without rebooting. -		 * If so, the link will reset soon, and the next -		 * discovery will be accepted. So we can ignore it. -		 * It may also be an cloned or malicious peer having -		 * chosen the same node address and signature as an -		 * existing one. -		 * Ignore requests until the link goes down, if ever. -		 */ -		disc_dupl_alert(bearer, onode, &maddr); -	} else if (sign_match && !addr_match && !link_up) { -		/* Peer link has changed i/f address without rebooting. -		 * It may also be a cloned or malicious peer; we can't -		 * distinguish between the two. -		 * The signature is correct, so we must accept. -		 */ -		accept_addr = true; -		respond = true; -	} else if (!sign_match && addr_match && link_up) { -		/* Peer node rebooted. Two possibilities: -		 *  - Delayed re-discovery; this link endpoint has already -		 *    reset and re-established contact with the peer, before -		 *    receiving a discovery message from that node. -		 *    (The peer happened to receive one from this node first). -		 *  - The peer came back so fast that our side has not -		 *    discovered it yet. Probing from this side will soon -		 *    reset the link, since there can be no working link -		 *    endpoint at the peer end, and the link will re-establish. -		 *  Accept the signature, since it comes from a known peer. -		 */ -		accept_sign = true; -	} else if (!sign_match && addr_match && !link_up) { -		/*  The peer node has rebooted. -		 *  Accept signature, since it is a known peer. -		 */ -		accept_sign = true; -		respond = true; -	} else if (!sign_match && !addr_match && link_up) { -		/* Peer rebooted with new address, or a new/duplicate peer. -		 * Ignore until the link goes down, if ever. -		 */ +	tipc_node_check_dest(net, onode, bearer, caps, signature, +			     &maddr, &respond, &dupl_addr); +	if (dupl_addr)  		disc_dupl_alert(bearer, onode, &maddr); -	} else if (!sign_match && !addr_match && !link_up) { -		/* Peer rebooted with new address, or it is a new peer. -		 * Accept signature and address. -		*/ -		accept_sign = true; -		accept_addr = true; -		respond = true; -	} - -	if (accept_sign) -		node->signature = signature; - -	if (accept_addr) { -		if (!link) -			link = tipc_link_create(node, bearer, &maddr); -		if (link) { -			memcpy(&link->media_addr, &maddr, sizeof(maddr)); -			tipc_link_reset(link); -		} else { -			respond = false; -		} -	}  	/* Send response, if necessary */  	if (respond && (mtyp == DSC_REQ_MSG)) { -		rbuf = tipc_buf_acquire(MAX_H_SIZE); -		if (rbuf) { -			tipc_disc_init_msg(net, rbuf, DSC_RESP_MSG, bearer); -			tipc_bearer_send(net, bearer->identity, rbuf, &maddr); -			kfree_skb(rbuf); +		rskb = tipc_buf_acquire(MAX_H_SIZE); +		if (rskb) { +			tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer); +			tipc_bearer_send(net, bearer->identity, rskb, &maddr); +			kfree_skb(rskb);  		}  	} -	tipc_node_unlock(node); -	tipc_node_put(node);  }  /** diff --git a/net/tipc/link.c b/net/tipc/link.c index eaa9fe54b4ae..75db07c78a69 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -48,9 +48,8 @@  /*   * Error message prefixes   */ -static const char *link_co_err = "Link changeover error, "; +static const char *link_co_err = "Link tunneling error, ";  static const char *link_rst_msg = "Resetting link "; -static const char *link_unk_evt = "Unknown link event ";  static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {  	[TIPC_NLA_LINK_UNSPEC]		= { .type = NLA_UNSPEC }, @@ -77,256 +76,413 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {  };  /* + * Interval between NACKs when packets arrive out of order + */ +#define TIPC_NACK_INTV (TIPC_MIN_LINK_WIN * 2) +/*   * Out-of-range value for link session numbers   */ -#define INVALID_SESSION 0x10000 +#define WILDCARD_SESSION 0x10000 -/* - * Link state events: +/* Link FSM states:   */ -#define  STARTING_EVT    856384768	/* link processing trigger */ -#define  TRAFFIC_MSG_EVT 560815u	/* rx'd ??? */ -#define  SILENCE_EVT     560817u	/* timer dicovered silence from peer */ +enum { +	LINK_ESTABLISHED     = 0xe, +	LINK_ESTABLISHING    = 0xe  << 4, +	LINK_RESET           = 0x1  << 8, +	LINK_RESETTING       = 0x2  << 12, +	LINK_PEER_RESET      = 0xd  << 16, +	LINK_FAILINGOVER     = 0xf  << 20, +	LINK_SYNCHING        = 0xc  << 24 +}; -/* - * State value stored in 'failover_pkts' +/* Link FSM state checking routines   */ -#define FIRST_FAILOVER 0xffffu - -static void link_handle_out_of_seq_msg(struct tipc_link *link, -				       struct sk_buff *skb); -static void tipc_link_proto_rcv(struct tipc_link *link, -				struct sk_buff *skb); -static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol); -static void link_state_event(struct tipc_link *l_ptr, u32 event); +static int link_is_up(struct tipc_link *l) +{ +	return l->state & (LINK_ESTABLISHED | LINK_SYNCHING); +} + +static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, +			       struct sk_buff_head *xmitq); +static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, +				      u16 rcvgap, int tolerance, int priority, +				      struct sk_buff_head *xmitq);  static void link_reset_statistics(struct tipc_link *l_ptr);  static void link_print(struct tipc_link *l_ptr, const char *str); -static void tipc_link_sync_xmit(struct tipc_link *l);  static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); -static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); -static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); -static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb); -static void link_set_timer(struct tipc_link *link, unsigned long time); +  /* - *  Simple link routines + *  Simple non-static link routines (i.e. referenced outside this file)   */ -static unsigned int align(unsigned int i) +bool tipc_link_is_up(struct tipc_link *l)  { -	return (i + 3) & ~3u; +	return link_is_up(l);  } -static void tipc_link_release(struct kref *kref) +bool tipc_link_is_reset(struct tipc_link *l)  { -	kfree(container_of(kref, struct tipc_link, ref)); +	return l->state & (LINK_RESET | LINK_FAILINGOVER | LINK_ESTABLISHING);  } -static void tipc_link_get(struct tipc_link *l_ptr) +bool tipc_link_is_synching(struct tipc_link *l)  { -	kref_get(&l_ptr->ref); +	return l->state == LINK_SYNCHING;  } -static void tipc_link_put(struct tipc_link *l_ptr) +bool tipc_link_is_failingover(struct tipc_link *l)  { -	kref_put(&l_ptr->ref, tipc_link_release); +	return l->state == LINK_FAILINGOVER;  } -static struct tipc_link *tipc_parallel_link(struct tipc_link *l) +bool tipc_link_is_blocked(struct tipc_link *l)  { -	if (l->owner->active_links[0] != l) -		return l->owner->active_links[0]; -	return l->owner->active_links[1]; +	return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);  } -/* - *  Simple non-static link routines (i.e. referenced outside this file) - */ -int tipc_link_is_up(struct tipc_link *l_ptr) +int tipc_link_is_active(struct tipc_link *l)  { -	if (!l_ptr) -		return 0; -	return link_working_working(l_ptr) || link_working_unknown(l_ptr); +	struct tipc_node *n = l->owner; + +	return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);  } -int tipc_link_is_active(struct tipc_link *l_ptr) +static u32 link_own_addr(struct tipc_link *l)  { -	return	(l_ptr->owner->active_links[0] == l_ptr) || -		(l_ptr->owner->active_links[1] == l_ptr); +	return msg_prevnode(l->pmsg);  }  /** - * link_timeout - handle expiration of link timer - * @l_ptr: pointer to link + * tipc_link_create - create a new link + * @n: pointer to associated node + * @b: pointer to associated bearer + * @ownnode: identity of own node + * @peer: identity of peer node + * @maddr: media address to be used + * @inputq: queue to put messages ready for delivery + * @namedq: queue to put binding table update messages ready for delivery + * @link: return value, pointer to put the created link + * + * Returns true if link was created, otherwise false   */ -static void link_timeout(unsigned long data) +bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, +		      u32 ownnode, u32 peer, struct tipc_media_addr *maddr, +		      struct sk_buff_head *inputq, struct sk_buff_head *namedq, +		      struct tipc_link **link)  { -	struct tipc_link *l_ptr = (struct tipc_link *)data; -	struct sk_buff *skb; +	struct tipc_link *l; +	struct tipc_msg *hdr; +	char *if_name; + +	l = kzalloc(sizeof(*l), GFP_ATOMIC); +	if (!l) +		return false; +	*link = l; + +	/* Note: peer i/f name is completed by reset/activate message */ +	if_name = strchr(b->name, ':') + 1; +	sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", +		tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), +		if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); + +	l->addr = peer; +	l->media_addr = maddr; +	l->owner = n; +	l->peer_session = WILDCARD_SESSION; +	l->bearer_id = b->identity; +	l->tolerance = b->tolerance; +	l->net_plane = b->net_plane; +	l->advertised_mtu = b->mtu; +	l->mtu = b->mtu; +	l->priority = b->priority; +	tipc_link_set_queue_limits(l, b->window); +	l->inputq = inputq; +	l->namedq = namedq; +	l->state = LINK_RESETTING; +	l->pmsg = (struct tipc_msg *)&l->proto_msg; +	hdr = l->pmsg; +	tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer); +	msg_set_size(hdr, sizeof(l->proto_msg)); +	msg_set_session(hdr, session); +	msg_set_bearer_id(hdr, l->bearer_id); +	strcpy((char *)msg_data(hdr), if_name); +	__skb_queue_head_init(&l->transmq); +	__skb_queue_head_init(&l->backlogq); +	__skb_queue_head_init(&l->deferdq); +	skb_queue_head_init(&l->wakeupq); +	skb_queue_head_init(l->inputq); +	return true; +} -	tipc_node_lock(l_ptr->owner); +/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + */ +void tipc_link_build_bcast_sync_msg(struct tipc_link *l, +				    struct sk_buff_head *xmitq) +{ +	struct sk_buff *skb; +	struct sk_buff_head list; +	u16 last_sent; -	/* update counters used in statistical profiling of send traffic */ -	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq); -	l_ptr->stats.queue_sz_counts++; +	skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, +			      0, l->addr, link_own_addr(l), 0, 0, 0); +	if (!skb) +		return; +	last_sent = tipc_bclink_get_last_sent(l->owner->net); +	msg_set_last_bcast(buf_msg(skb), last_sent); +	__skb_queue_head_init(&list); +	__skb_queue_tail(&list, skb); +	tipc_link_xmit(l, &list, xmitq); +} -	skb = skb_peek(&l_ptr->transmq); -	if (skb) { -		struct tipc_msg *msg = buf_msg(skb); -		u32 length = msg_size(msg); +/** + * tipc_link_fsm_evt - link finite state machine + * @l: pointer to link + * @evt: state machine event to be processed + */ +int tipc_link_fsm_evt(struct tipc_link *l, int evt) +{ +	int rc = 0; -		if ((msg_user(msg) == MSG_FRAGMENTER) && -		    (msg_type(msg) == FIRST_FRAGMENT)) { -			length = msg_size(msg_get_wrapped(msg)); +	switch (l->state) { +	case LINK_RESETTING: +		switch (evt) { +		case LINK_PEER_RESET_EVT: +			l->state = LINK_PEER_RESET; +			break; +		case LINK_RESET_EVT: +			l->state = LINK_RESET; +			break; +		case LINK_FAILURE_EVT: +		case LINK_FAILOVER_BEGIN_EVT: +		case LINK_ESTABLISH_EVT: +		case LINK_FAILOVER_END_EVT: +		case LINK_SYNCH_BEGIN_EVT: +		case LINK_SYNCH_END_EVT: +		default: +			goto illegal_evt;  		} -		if (length) { -			l_ptr->stats.msg_lengths_total += length; -			l_ptr->stats.msg_length_counts++; -			if (length <= 64) -				l_ptr->stats.msg_length_profile[0]++; -			else if (length <= 256) -				l_ptr->stats.msg_length_profile[1]++; -			else if (length <= 1024) -				l_ptr->stats.msg_length_profile[2]++; -			else if (length <= 4096) -				l_ptr->stats.msg_length_profile[3]++; -			else if (length <= 16384) -				l_ptr->stats.msg_length_profile[4]++; -			else if (length <= 32768) -				l_ptr->stats.msg_length_profile[5]++; -			else -				l_ptr->stats.msg_length_profile[6]++; +		break; +	case LINK_RESET: +		switch (evt) { +		case LINK_PEER_RESET_EVT: +			l->state = LINK_ESTABLISHING; +			break; +		case LINK_FAILOVER_BEGIN_EVT: +			l->state = LINK_FAILINGOVER; +		case LINK_FAILURE_EVT: +		case LINK_RESET_EVT: +		case LINK_ESTABLISH_EVT: +		case LINK_FAILOVER_END_EVT: +			break; +		case LINK_SYNCH_BEGIN_EVT: +		case LINK_SYNCH_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case LINK_PEER_RESET: +		switch (evt) { +		case LINK_RESET_EVT: +			l->state = LINK_ESTABLISHING; +			break; +		case LINK_PEER_RESET_EVT: +		case LINK_ESTABLISH_EVT: +		case LINK_FAILURE_EVT: +			break; +		case LINK_SYNCH_BEGIN_EVT: +		case LINK_SYNCH_END_EVT: +		case LINK_FAILOVER_BEGIN_EVT: +		case LINK_FAILOVER_END_EVT: +		default: +			goto illegal_evt;  		} +		break; +	case LINK_FAILINGOVER: +		switch (evt) { +		case LINK_FAILOVER_END_EVT: +			l->state = LINK_RESET; +			break; +		case LINK_PEER_RESET_EVT: +		case LINK_RESET_EVT: +		case LINK_ESTABLISH_EVT: +		case LINK_FAILURE_EVT: +			break; +		case LINK_FAILOVER_BEGIN_EVT: +		case LINK_SYNCH_BEGIN_EVT: +		case LINK_SYNCH_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case LINK_ESTABLISHING: +		switch (evt) { +		case LINK_ESTABLISH_EVT: +			l->state = LINK_ESTABLISHED; +			rc |= TIPC_LINK_UP_EVT; +			break; +		case LINK_FAILOVER_BEGIN_EVT: +			l->state = LINK_FAILINGOVER; +			break; +		case LINK_PEER_RESET_EVT: +		case LINK_RESET_EVT: +		case LINK_FAILURE_EVT: +		case LINK_SYNCH_BEGIN_EVT: +		case LINK_FAILOVER_END_EVT: +			break; +		case LINK_SYNCH_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case LINK_ESTABLISHED: +		switch (evt) { +		case LINK_PEER_RESET_EVT: +			l->state = LINK_PEER_RESET; +			rc |= TIPC_LINK_DOWN_EVT; +			break; +		case LINK_FAILURE_EVT: +			l->state = LINK_RESETTING; +			rc |= TIPC_LINK_DOWN_EVT; +			break; +		case LINK_RESET_EVT: +			l->state = LINK_RESET; +			break; +		case LINK_ESTABLISH_EVT: +		case LINK_SYNCH_END_EVT: +			break; +		case LINK_SYNCH_BEGIN_EVT: +			l->state = LINK_SYNCHING; +			break; +		case LINK_FAILOVER_BEGIN_EVT: +		case LINK_FAILOVER_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case LINK_SYNCHING: +		switch (evt) { +		case LINK_PEER_RESET_EVT: +			l->state = LINK_PEER_RESET; +			rc |= TIPC_LINK_DOWN_EVT; +			break; +		case LINK_FAILURE_EVT: +			l->state = LINK_RESETTING; +			rc |= TIPC_LINK_DOWN_EVT; +			break; +		case LINK_RESET_EVT: +			l->state = LINK_RESET; +			break; +		case LINK_ESTABLISH_EVT: +		case LINK_SYNCH_BEGIN_EVT: +			break; +		case LINK_SYNCH_END_EVT: +			l->state = LINK_ESTABLISHED; +			break; +		case LINK_FAILOVER_BEGIN_EVT: +		case LINK_FAILOVER_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	default: +		pr_err("Unknown FSM state %x in %s\n", l->state, l->name);  	} - -	/* do all other link processing performed on a periodic basis */ -	if (l_ptr->silent_intv_cnt || tipc_bclink_acks_missing(l_ptr->owner)) -		link_state_event(l_ptr, SILENCE_EVT); -	l_ptr->silent_intv_cnt++; -	if (skb_queue_len(&l_ptr->backlogq)) -		tipc_link_push_packets(l_ptr); -	link_set_timer(l_ptr, l_ptr->keepalive_intv); -	tipc_node_unlock(l_ptr->owner); -	tipc_link_put(l_ptr); -} - -static void link_set_timer(struct tipc_link *link, unsigned long time) -{ -	if (!mod_timer(&link->timer, jiffies + time)) -		tipc_link_get(link); +	return rc; +illegal_evt: +	pr_err("Illegal FSM event %x in state %x on link %s\n", +	       evt, l->state, l->name); +	return rc;  } -/** - * tipc_link_create - create a new link - * @n_ptr: pointer to associated node - * @b_ptr: pointer to associated bearer - * @media_addr: media address to use when sending messages over link - * - * Returns pointer to link. +/* link_profile_stats - update statistical profiling of traffic   */ -struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, -				   struct tipc_bearer *b_ptr, -				   const struct tipc_media_addr *media_addr) +static void link_profile_stats(struct tipc_link *l)  { -	struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); -	struct tipc_link *l_ptr; +	struct sk_buff *skb;  	struct tipc_msg *msg; -	char *if_name; -	char addr_string[16]; -	u32 peer = n_ptr->addr; +	int length; -	if (n_ptr->link_cnt >= MAX_BEARERS) { -		tipc_addr_string_fill(addr_string, n_ptr->addr); -		pr_err("Cannot establish %uth link to %s. Max %u allowed.\n", -		       n_ptr->link_cnt, addr_string, MAX_BEARERS); -		return NULL; -	} +	/* Update counters used in statistical profiling of send traffic */ +	l->stats.accu_queue_sz += skb_queue_len(&l->transmq); +	l->stats.queue_sz_counts++; -	if (n_ptr->links[b_ptr->identity]) { -		tipc_addr_string_fill(addr_string, n_ptr->addr); -		pr_err("Attempt to establish second link on <%s> to %s\n", -		       b_ptr->name, addr_string); -		return NULL; -	} +	skb = skb_peek(&l->transmq); +	if (!skb) +		return; +	msg = buf_msg(skb); +	length = msg_size(msg); -	l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); -	if (!l_ptr) { -		pr_warn("Link creation failed, no memory\n"); -		return NULL; +	if (msg_user(msg) == MSG_FRAGMENTER) { +		if (msg_type(msg) != FIRST_FRAGMENT) +			return; +		length = msg_size(msg_get_wrapped(msg));  	} -	kref_init(&l_ptr->ref); -	l_ptr->addr = peer; -	if_name = strchr(b_ptr->name, ':') + 1; -	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown", -		tipc_zone(tn->own_addr), tipc_cluster(tn->own_addr), -		tipc_node(tn->own_addr), -		if_name, -		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); -		/* note: peer i/f name is updated by reset/activate message */ -	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); -	l_ptr->owner = n_ptr; -	l_ptr->peer_session = INVALID_SESSION; -	l_ptr->bearer_id = b_ptr->identity; -	link_set_supervision_props(l_ptr, b_ptr->tolerance); -	l_ptr->state = RESET_UNKNOWN; - -	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; -	msg = l_ptr->pmsg; -	tipc_msg_init(tn->own_addr, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, -		      l_ptr->addr); -	msg_set_size(msg, sizeof(l_ptr->proto_msg)); -	msg_set_session(msg, (tn->random & 0xffff)); -	msg_set_bearer_id(msg, b_ptr->identity); -	strcpy((char *)msg_data(msg), if_name); -	l_ptr->net_plane = b_ptr->net_plane; -	l_ptr->advertised_mtu = b_ptr->mtu; -	l_ptr->mtu = l_ptr->advertised_mtu; -	l_ptr->priority = b_ptr->priority; -	tipc_link_set_queue_limits(l_ptr, b_ptr->window); -	l_ptr->snd_nxt = 1; -	__skb_queue_head_init(&l_ptr->transmq); -	__skb_queue_head_init(&l_ptr->backlogq); -	__skb_queue_head_init(&l_ptr->deferdq); -	skb_queue_head_init(&l_ptr->wakeupq); -	skb_queue_head_init(&l_ptr->inputq); -	skb_queue_head_init(&l_ptr->namedq); -	link_reset_statistics(l_ptr); -	tipc_node_attach_link(n_ptr, l_ptr); -	setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr); -	link_state_event(l_ptr, STARTING_EVT); - -	return l_ptr; +	l->stats.msg_lengths_total += length; +	l->stats.msg_length_counts++; +	if (length <= 64) +		l->stats.msg_length_profile[0]++; +	else if (length <= 256) +		l->stats.msg_length_profile[1]++; +	else if (length <= 1024) +		l->stats.msg_length_profile[2]++; +	else if (length <= 4096) +		l->stats.msg_length_profile[3]++; +	else if (length <= 16384) +		l->stats.msg_length_profile[4]++; +	else if (length <= 32768) +		l->stats.msg_length_profile[5]++; +	else +		l->stats.msg_length_profile[6]++;  } -/** - * tipc_link_delete - Delete a link - * @l: link to be deleted +/* tipc_link_timeout - perform periodic task as instructed from node timeout   */ -void tipc_link_delete(struct tipc_link *l) +int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)  { -	tipc_link_reset(l); -	if (del_timer(&l->timer)) -		tipc_link_put(l); -	l->flags |= LINK_STOPPED; -	/* Delete link now, or when timer is finished: */ -	tipc_link_reset_fragments(l); -	tipc_node_detach_link(l->owner, l); -	tipc_link_put(l); -} +	int rc = 0; +	int mtyp = STATE_MSG; +	bool xmit = false; +	bool prb = false; + +	link_profile_stats(l); + +	switch (l->state) { +	case LINK_ESTABLISHED: +	case LINK_SYNCHING: +		if (!l->silent_intv_cnt) { +			if (tipc_bclink_acks_missing(l->owner)) +				xmit = true; +		} else if (l->silent_intv_cnt <= l->abort_limit) { +			xmit = true; +			prb = true; +		} else { +			rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT); +		} +		l->silent_intv_cnt++; +		break; +	case LINK_RESET: +		xmit = true; +		mtyp = RESET_MSG; +		break; +	case LINK_ESTABLISHING: +		xmit = true; +		mtyp = ACTIVATE_MSG; +		break; +	case LINK_PEER_RESET: +	case LINK_RESETTING: +	case LINK_FAILINGOVER: +		break; +	default: +		break; +	} -void tipc_link_delete_list(struct net *net, unsigned int bearer_id) -{ -	struct tipc_net *tn = net_generic(net, tipc_net_id); -	struct tipc_link *link; -	struct tipc_node *node; +	if (xmit) +		tipc_link_build_proto_msg(l, mtyp, prb, 0, 0, 0, xmitq); -	rcu_read_lock(); -	list_for_each_entry_rcu(node, &tn->node_list, list) { -		tipc_node_lock(node); -		link = node->links[bearer_id]; -		if (link) -			tipc_link_delete(link); -		tipc_node_unlock(node); -	} -	rcu_read_unlock(); +	return rc;  }  /** @@ -334,7 +490,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)   * @link: congested link   * @list: message that was attempted sent   * Create pseudo msg to send back to user when congestion abates - * Only consumes message if there is an error + * Does not consume buffer list   */  static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)  { @@ -347,8 +503,7 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)  	/* This really cannot happen...  */  	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {  		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); -		tipc_link_reset(link); -		goto err; +		return -ENOBUFS;  	}  	/* Non-blocking sender: */  	if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending) @@ -358,15 +513,12 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)  	skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,  			      addr, addr, oport, 0, 0);  	if (!skb) -		goto err; +		return -ENOBUFS;  	TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);  	TIPC_SKB_CB(skb)->chain_imp = imp;  	skb_queue_tail(&link->wakeupq, skb);  	link->stats.link_congs++;  	return -ELINKCONG; -err: -	__skb_queue_purge(list); -	return -ENOBUFS;  }  /** @@ -388,9 +540,7 @@ void link_prepare_wakeup(struct tipc_link *l)  		if ((pnd[imp] + l->backlog[imp].len) >= lim)  			break;  		skb_unlink(skb, &l->wakeupq); -		skb_queue_tail(&l->inputq, skb); -		l->owner->inputq = &l->inputq; -		l->owner->action_flags |= TIPC_MSG_EVT; +		skb_queue_tail(l->inputq, skb);  	}  } @@ -426,208 +576,36 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr)  	tipc_link_reset_fragments(l_ptr);  } -void tipc_link_reset(struct tipc_link *l_ptr) +void tipc_link_reset(struct tipc_link *l)  { -	u32 prev_state = l_ptr->state; -	int was_active_link = tipc_link_is_active(l_ptr); -	struct tipc_node *owner = l_ptr->owner; -	struct tipc_link *pl = tipc_parallel_link(l_ptr); - -	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); +	tipc_link_fsm_evt(l, LINK_RESET_EVT);  	/* Link is down, accept any session */ -	l_ptr->peer_session = INVALID_SESSION; - -	/* Prepare for renewed mtu size negotiation */ -	l_ptr->mtu = l_ptr->advertised_mtu; - -	l_ptr->state = RESET_UNKNOWN; +	l->peer_session = WILDCARD_SESSION; -	if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET)) -		return; - -	tipc_node_link_down(l_ptr->owner, l_ptr); -	tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr); +	/* If peer is up, it only accepts an incremented session number */ +	msg_set_session(l->pmsg, msg_session(l->pmsg) + 1); -	if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) { -		l_ptr->flags |= LINK_FAILINGOVER; -		l_ptr->failover_checkpt = l_ptr->rcv_nxt; -		pl->failover_pkts = FIRST_FAILOVER; -		pl->failover_checkpt = l_ptr->rcv_nxt; -		pl->failover_skb = l_ptr->reasm_buf; -	} else { -		kfree_skb(l_ptr->reasm_buf); -	} -	/* Clean up all queues, except inputq: */ -	__skb_queue_purge(&l_ptr->transmq); -	__skb_queue_purge(&l_ptr->deferdq); -	if (!owner->inputq) -		owner->inputq = &l_ptr->inputq; -	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); -	if (!skb_queue_empty(owner->inputq)) -		owner->action_flags |= TIPC_MSG_EVT; -	tipc_link_purge_backlog(l_ptr); -	l_ptr->reasm_buf = NULL; -	l_ptr->rcv_unacked = 0; -	l_ptr->snd_nxt = 1; -	l_ptr->silent_intv_cnt = 0; -	l_ptr->stale_count = 0; -	link_reset_statistics(l_ptr); -} - -static void link_activate(struct tipc_link *link) -{ -	struct tipc_node *node = link->owner; - -	link->rcv_nxt = 1; -	link->stats.recv_info = 1; -	link->silent_intv_cnt = 0; -	tipc_node_link_up(node, link); -	tipc_bearer_add_dest(node->net, link->bearer_id, link->addr); -} - -/** - * link_state_event - link finite state machine - * @l_ptr: pointer to link - * @event: state machine event to process - */ -static void link_state_event(struct tipc_link *l_ptr, unsigned int event) -{ -	struct tipc_link *other; -	unsigned long timer_intv = l_ptr->keepalive_intv; - -	if (l_ptr->flags & LINK_STOPPED) -		return; - -	if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT)) -		return;		/* Not yet. */ - -	if (l_ptr->flags & LINK_FAILINGOVER) -		return; - -	switch (l_ptr->state) { -	case WORKING_WORKING: -		switch (event) { -		case TRAFFIC_MSG_EVT: -		case ACTIVATE_MSG: -			l_ptr->silent_intv_cnt = 0; -			break; -		case SILENCE_EVT: -			if (!l_ptr->silent_intv_cnt) { -				if (tipc_bclink_acks_missing(l_ptr->owner)) -					tipc_link_proto_xmit(l_ptr, STATE_MSG, -							     0, 0, 0, 0); -				break; -			} -			l_ptr->state = WORKING_UNKNOWN; -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); -			break; -		case RESET_MSG: -			pr_debug("%s<%s>, requested by peer\n", -				 link_rst_msg, l_ptr->name); -			tipc_link_reset(l_ptr); -			l_ptr->state = RESET_RESET; -			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, -					     0, 0, 0, 0); -			break; -		default: -			pr_debug("%s%u in WW state\n", link_unk_evt, event); -		} -		break; -	case WORKING_UNKNOWN: -		switch (event) { -		case TRAFFIC_MSG_EVT: -		case ACTIVATE_MSG: -			l_ptr->state = WORKING_WORKING; -			l_ptr->silent_intv_cnt = 0; -			break; -		case RESET_MSG: -			pr_debug("%s<%s>, requested by peer while probing\n", -				 link_rst_msg, l_ptr->name); -			tipc_link_reset(l_ptr); -			l_ptr->state = RESET_RESET; -			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, -					     0, 0, 0, 0); -			break; -		case SILENCE_EVT: -			if (!l_ptr->silent_intv_cnt) { -				l_ptr->state = WORKING_WORKING; -				if (tipc_bclink_acks_missing(l_ptr->owner)) -					tipc_link_proto_xmit(l_ptr, STATE_MSG, -							     0, 0, 0, 0); -			} else if (l_ptr->silent_intv_cnt < -				   l_ptr->abort_limit) { -				tipc_link_proto_xmit(l_ptr, STATE_MSG, -						     1, 0, 0, 0); -			} else {	/* Link has failed */ -				pr_debug("%s<%s>, peer not responding\n", -					 link_rst_msg, l_ptr->name); -				tipc_link_reset(l_ptr); -				l_ptr->state = RESET_UNKNOWN; -				tipc_link_proto_xmit(l_ptr, RESET_MSG, -						     0, 0, 0, 0); -			} -			break; -		default: -			pr_err("%s%u in WU state\n", link_unk_evt, event); -		} -		break; -	case RESET_UNKNOWN: -		switch (event) { -		case TRAFFIC_MSG_EVT: -			break; -		case ACTIVATE_MSG: -			other = l_ptr->owner->active_links[0]; -			if (other && link_working_unknown(other)) -				break; -			l_ptr->state = WORKING_WORKING; -			link_activate(l_ptr); -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); -			if (l_ptr->owner->working_links == 1) -				tipc_link_sync_xmit(l_ptr); -			break; -		case RESET_MSG: -			l_ptr->state = RESET_RESET; -			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, -					     1, 0, 0, 0); -			break; -		case STARTING_EVT: -			l_ptr->flags |= LINK_STARTED; -			link_set_timer(l_ptr, timer_intv); -			break; -		case SILENCE_EVT: -			tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0); -			break; -		default: -			pr_err("%s%u in RU state\n", link_unk_evt, event); -		} -		break; -	case RESET_RESET: -		switch (event) { -		case TRAFFIC_MSG_EVT: -		case ACTIVATE_MSG: -			other = l_ptr->owner->active_links[0]; -			if (other && link_working_unknown(other)) -				break; -			l_ptr->state = WORKING_WORKING; -			link_activate(l_ptr); -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0); -			if (l_ptr->owner->working_links == 1) -				tipc_link_sync_xmit(l_ptr); -			break; -		case RESET_MSG: -			break; -		case SILENCE_EVT: -			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, -					     0, 0, 0, 0); -			break; -		default: -			pr_err("%s%u in RR state\n", link_unk_evt, event); -		} -		break; -	default: -		pr_err("Unknown link state %u/%u\n", l_ptr->state, event); -	} +	/* Prepare for renewed mtu size negotiation */ +	l->mtu = l->advertised_mtu; + +	/* Clean up all queues: */ +	__skb_queue_purge(&l->transmq); +	__skb_queue_purge(&l->deferdq); +	skb_queue_splice_init(&l->wakeupq, l->inputq); + +	tipc_link_purge_backlog(l); +	kfree_skb(l->reasm_buf); +	kfree_skb(l->failover_reasm_skb); +	l->reasm_buf = NULL; +	l->failover_reasm_skb = NULL; +	l->rcv_unacked = 0; +	l->snd_nxt = 1; +	l->rcv_nxt = 1; +	l->silent_intv_cnt = 0; +	l->stats.recv_info = 0; +	l->stale_count = 0; +	link_reset_statistics(l);  }  /** @@ -635,8 +613,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)   * @link: link to use   * @list: chain of buffers containing message   * - * Consumes the buffer chain, except when returning -ELINKCONG, - * since the caller then may want to make more send attempts. + * Consumes the buffer chain, except when returning an error code,   * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS   * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted   */ @@ -650,7 +627,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,  	u16 ack = mod(link->rcv_nxt - 1);  	u16 seqno = link->snd_nxt;  	u16 bc_last_in = link->owner->bclink.last_in; -	struct tipc_media_addr *addr = &link->media_addr; +	struct tipc_media_addr *addr = link->media_addr;  	struct sk_buff_head *transmq = &link->transmq;  	struct sk_buff_head *backlogq = &link->backlogq;  	struct sk_buff *skb, *bskb; @@ -660,10 +637,9 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,  		if (unlikely(link->backlog[i].len >= link->backlog[i].limit))  			return link_schedule_user(link, list);  	} -	if (unlikely(msg_size(msg) > mtu)) { -		__skb_queue_purge(list); +	if (unlikely(msg_size(msg) > mtu))  		return -EMSGSIZE; -	} +  	/* Prepare each packet for sending, and add to relevant queue: */  	while (skb_queue_len(list)) {  		skb = skb_peek(list); @@ -700,101 +676,76 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,  	return 0;  } -static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) -{ -	skb_queue_head_init(list); -	__skb_queue_tail(list, skb); -} - -static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb) -{ -	struct sk_buff_head head; - -	skb2list(skb, &head); -	return __tipc_link_xmit(link->owner->net, link, &head); -} - -/* tipc_link_xmit_skb(): send single buffer to destination - * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE - * messages, which will not be rejected - * The only exception is datagram messages rerouted after secondary - * lookup, which are rare and safe to dispose of anyway. - * TODO: Return real return value, and let callers use - * tipc_wait_for_sendpkt() where applicable - */ -int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, -		       u32 selector) -{ -	struct sk_buff_head head; -	int rc; - -	skb2list(skb, &head); -	rc = tipc_link_xmit(net, &head, dnode, selector); -	if (rc == -ELINKCONG) -		kfree_skb(skb); -	return 0; -} -  /** - * tipc_link_xmit() is the general link level function for message sending - * @net: the applicable net namespace + * tipc_link_xmit(): enqueue buffer list according to queue situation + * @link: link to use   * @list: chain of buffers containing message - * @dsz: amount of user data to be sent - * @dnode: address of destination node - * @selector: a number used for deterministic link selection - * Consumes the buffer chain, except when returning -ELINKCONG - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + * @xmitq: returned list of packets to be sent by caller + * + * Consumes the buffer chain, except when returning -ELINKCONG, + * since the caller then may want to make more send attempts. + * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS + * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted   */ -int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, -		   u32 selector) +int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, +		   struct sk_buff_head *xmitq)  { -	struct tipc_link *link = NULL; -	struct tipc_node *node; -	int rc = -EHOSTUNREACH; +	struct tipc_msg *hdr = buf_msg(skb_peek(list)); +	unsigned int maxwin = l->window; +	unsigned int i, imp = msg_importance(hdr); +	unsigned int mtu = l->mtu; +	u16 ack = l->rcv_nxt - 1; +	u16 seqno = l->snd_nxt; +	u16 bc_last_in = l->owner->bclink.last_in; +	struct sk_buff_head *transmq = &l->transmq; +	struct sk_buff_head *backlogq = &l->backlogq; +	struct sk_buff *skb, *_skb, *bskb; -	node = tipc_node_find(net, dnode); -	if (node) { -		tipc_node_lock(node); -		link = node->active_links[selector & 1]; -		if (link) -			rc = __tipc_link_xmit(net, link, list); -		tipc_node_unlock(node); -		tipc_node_put(node); -	} -	if (link) -		return rc; - -	if (likely(in_own_node(net, dnode))) { -		tipc_sk_rcv(net, list); -		return 0; +	/* Match msg importance against this and all higher backlog limits: */ +	for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { +		if (unlikely(l->backlog[i].len >= l->backlog[i].limit)) +			return link_schedule_user(l, list);  	} +	if (unlikely(msg_size(hdr) > mtu)) +		return -EMSGSIZE; -	__skb_queue_purge(list); -	return rc; -} - -/* - * tipc_link_sync_xmit - synchronize broadcast link endpoints. - * - * Give a newly added peer node the sequence number where it should - * start receiving and acking broadcast packets. - * - * Called with node locked - */ -static void tipc_link_sync_xmit(struct tipc_link *link) -{ -	struct sk_buff *skb; -	struct tipc_msg *msg; - -	skb = tipc_buf_acquire(INT_H_SIZE); -	if (!skb) -		return; +	/* Prepare each packet for sending, and add to relevant queue: */ +	while (skb_queue_len(list)) { +		skb = skb_peek(list); +		hdr = buf_msg(skb); +		msg_set_seqno(hdr, seqno); +		msg_set_ack(hdr, ack); +		msg_set_bcast_ack(hdr, bc_last_in); -	msg = buf_msg(skb); -	tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG, -		      INT_H_SIZE, link->addr); -	msg_set_last_bcast(msg, link->owner->bclink.acked); -	__tipc_link_xmit_skb(link, skb); +		if (likely(skb_queue_len(transmq) < maxwin)) { +			_skb = skb_clone(skb, GFP_ATOMIC); +			if (!_skb) +				return -ENOBUFS; +			__skb_dequeue(list); +			__skb_queue_tail(transmq, skb); +			__skb_queue_tail(xmitq, _skb); +			l->rcv_unacked = 0; +			seqno++; +			continue; +		} +		if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) { +			kfree_skb(__skb_dequeue(list)); +			l->stats.sent_bundled++; +			continue; +		} +		if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) { +			kfree_skb(__skb_dequeue(list)); +			__skb_queue_tail(backlogq, bskb); +			l->backlog[msg_importance(buf_msg(bskb))].len++; +			l->stats.sent_bundled++; +			l->stats.sent_bundles++; +			continue; +		} +		l->backlog[imp].len += skb_queue_len(list); +		skb_queue_splice_tail_init(list, backlogq); +	} +	l->snd_nxt = seqno; +	return 0;  }  /* @@ -842,29 +793,37 @@ void tipc_link_push_packets(struct tipc_link *link)  		link->rcv_unacked = 0;  		__skb_queue_tail(&link->transmq, skb);  		tipc_bearer_send(link->owner->net, link->bearer_id, -				 skb, &link->media_addr); +				 skb, link->media_addr);  	}  	link->snd_nxt = seqno;  } -void tipc_link_reset_all(struct tipc_node *node) +void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)  { -	char addr_string[16]; -	u32 i; - -	tipc_node_lock(node); +	struct sk_buff *skb, *_skb; +	struct tipc_msg *hdr; +	u16 seqno = l->snd_nxt; +	u16 ack = l->rcv_nxt - 1; -	pr_warn("Resetting all links to %s\n", -		tipc_addr_string_fill(addr_string, node->addr)); - -	for (i = 0; i < MAX_BEARERS; i++) { -		if (node->links[i]) { -			link_print(node->links[i], "Resetting link\n"); -			tipc_link_reset(node->links[i]); -		} +	while (skb_queue_len(&l->transmq) < l->window) { +		skb = skb_peek(&l->backlogq); +		if (!skb) +			break; +		_skb = skb_clone(skb, GFP_ATOMIC); +		if (!_skb) +			break; +		__skb_dequeue(&l->backlogq); +		hdr = buf_msg(skb); +		l->backlog[msg_importance(hdr)].len--; +		__skb_queue_tail(&l->transmq, skb); +		__skb_queue_tail(xmitq, _skb); +		msg_set_ack(hdr, ack); +		msg_set_seqno(hdr, seqno); +		msg_set_bcast_ack(hdr, l->owner->bclink.last_in); +		l->rcv_unacked = 0; +		seqno++;  	} - -	tipc_node_unlock(node); +	l->snd_nxt = seqno;  }  static void link_retransmit_failure(struct tipc_link *l_ptr, @@ -877,9 +836,12 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,  	if (l_ptr->addr) {  		/* Handle failure on standard link */ -		link_print(l_ptr, "Resetting link\n"); -		tipc_link_reset(l_ptr); - +		link_print(l_ptr, "Resetting link "); +		pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", +			msg_user(msg), msg_type(msg), msg_size(msg), +			msg_errcode(msg)); +		pr_info("sqno %u, prev: %x, src: %x\n", +			msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));  	} else {  		/* Handle failure on broadcast link */  		struct tipc_node *n_ptr; @@ -934,191 +896,45 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,  		msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));  		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);  		tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb, -				 &l_ptr->media_addr); +				 l_ptr->media_addr);  		retransmits--;  		l_ptr->stats.retransmitted++;  	}  } -/* link_synch(): check if all packets arrived before the synch - *               point have been consumed - * Returns true if the parallel links are synched, otherwise false - */ -static bool link_synch(struct tipc_link *l) +static int tipc_link_retransm(struct tipc_link *l, int retransm, +			      struct sk_buff_head *xmitq)  { -	unsigned int post_synch; -	struct tipc_link *pl; +	struct sk_buff *_skb, *skb = skb_peek(&l->transmq); +	struct tipc_msg *hdr; -	pl  = tipc_parallel_link(l); -	if (pl == l) -		goto synched; - -	/* Was last pre-synch packet added to input queue ? */ -	if (less_eq(pl->rcv_nxt, l->synch_point)) -		return false; - -	/* Is it still in the input queue ? */ -	post_synch = mod(pl->rcv_nxt - l->synch_point) - 1; -	if (skb_queue_len(&pl->inputq) > post_synch) -		return false; -synched: -	l->flags &= ~LINK_SYNCHING; -	return true; -} - -static void link_retrieve_defq(struct tipc_link *link, -			       struct sk_buff_head *list) -{ -	u16 seq_no; - -	if (skb_queue_empty(&link->deferdq)) -		return; - -	seq_no = buf_seqno(skb_peek(&link->deferdq)); -	if (seq_no == link->rcv_nxt) -		skb_queue_splice_tail_init(&link->deferdq, list); -} - -/** - * tipc_rcv - process TIPC packets/messages arriving from off-node - * @net: the applicable net namespace - * @skb: TIPC packet - * @b_ptr: pointer to bearer message arrived on - * - * Invoked with no locks held.  Bearer pointer must point to a valid bearer - * structure (i.e. cannot be NULL), but bearer can be inactive. - */ -void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr) -{ -	struct tipc_net *tn = net_generic(net, tipc_net_id); -	struct sk_buff_head head; -	struct tipc_node *n_ptr; -	struct tipc_link *l_ptr; -	struct sk_buff *skb1, *tmp; -	struct tipc_msg *msg; -	u16 seq_no; -	u16 ackd; -	u32 released; - -	skb2list(skb, &head); - -	while ((skb = __skb_dequeue(&head))) { -		/* Ensure message is well-formed */ -		if (unlikely(!tipc_msg_validate(skb))) -			goto discard; - -		/* Handle arrival of a non-unicast link message */ -		msg = buf_msg(skb); -		if (unlikely(msg_non_seq(msg))) { -			if (msg_user(msg) ==  LINK_CONFIG) -				tipc_disc_rcv(net, skb, b_ptr); -			else -				tipc_bclink_rcv(net, skb); -			continue; -		} - -		/* Discard unicast link messages destined for another node */ -		if (unlikely(!msg_short(msg) && -			     (msg_destnode(msg) != tn->own_addr))) -			goto discard; - -		/* Locate neighboring node that sent message */ -		n_ptr = tipc_node_find(net, msg_prevnode(msg)); -		if (unlikely(!n_ptr)) -			goto discard; - -		tipc_node_lock(n_ptr); -		/* Locate unicast link endpoint that should handle message */ -		l_ptr = n_ptr->links[b_ptr->identity]; -		if (unlikely(!l_ptr)) -			goto unlock; - -		/* Verify that communication with node is currently allowed */ -		if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && -		    msg_user(msg) == LINK_PROTOCOL && -		    (msg_type(msg) == RESET_MSG || -		    msg_type(msg) == ACTIVATE_MSG) && -		    !msg_redundant_link(msg)) -			n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN; - -		if (tipc_node_blocked(n_ptr)) -			goto unlock; - -		/* Validate message sequence number info */ -		seq_no = msg_seqno(msg); -		ackd = msg_ack(msg); - -		/* Release acked messages */ -		if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg))) -			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); - -		released = 0; -		skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) { -			if (more(buf_seqno(skb1), ackd)) -				break; -			 __skb_unlink(skb1, &l_ptr->transmq); -			 kfree_skb(skb1); -			 released = 1; -		} - -		/* Try sending any messages link endpoint has pending */ -		if (unlikely(skb_queue_len(&l_ptr->backlogq))) -			tipc_link_push_packets(l_ptr); - -		if (released && !skb_queue_empty(&l_ptr->wakeupq)) -			link_prepare_wakeup(l_ptr); - -		/* Process the incoming packet */ -		if (unlikely(!link_working_working(l_ptr))) { -			if (msg_user(msg) == LINK_PROTOCOL) { -				tipc_link_proto_rcv(l_ptr, skb); -				link_retrieve_defq(l_ptr, &head); -				skb = NULL; -				goto unlock; -			} - -			/* Traffic message. Conditionally activate link */ -			link_state_event(l_ptr, TRAFFIC_MSG_EVT); - -			if (link_working_working(l_ptr)) { -				/* Re-insert buffer in front of queue */ -				__skb_queue_head(&head, skb); -				skb = NULL; -				goto unlock; -			} -			goto unlock; -		} - -		/* Link is now in state WORKING_WORKING */ -		if (unlikely(seq_no != l_ptr->rcv_nxt)) { -			link_handle_out_of_seq_msg(l_ptr, skb); -			link_retrieve_defq(l_ptr, &head); -			skb = NULL; -			goto unlock; -		} -		l_ptr->silent_intv_cnt = 0; +	if (!skb) +		return 0; -		/* Synchronize with parallel link if applicable */ -		if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) { -			if (!link_synch(l_ptr)) -				goto unlock; -		} -		l_ptr->rcv_nxt++; -		if (unlikely(!skb_queue_empty(&l_ptr->deferdq))) -			link_retrieve_defq(l_ptr, &head); -		if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) { -			l_ptr->stats.sent_acks++; -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0); -		} -		tipc_link_input(l_ptr, skb); -		skb = NULL; -unlock: -		tipc_node_unlock(n_ptr); -		tipc_node_put(n_ptr); -discard: -		if (unlikely(skb)) -			kfree_skb(skb); +	/* Detect repeated retransmit failures on same packet */ +	if (likely(l->last_retransm != buf_seqno(skb))) { +		l->last_retransm = buf_seqno(skb); +		l->stale_count = 1; +	} else if (++l->stale_count > 100) { +		link_retransmit_failure(l, skb); +		return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); +	} +	skb_queue_walk(&l->transmq, skb) { +		if (!retransm) +			return 0; +		hdr = buf_msg(skb); +		_skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); +		if (!_skb) +			return 0; +		hdr = buf_msg(_skb); +		msg_set_ack(hdr, l->rcv_nxt - 1); +		msg_set_bcast_ack(hdr, l->owner->bclink.last_in); +		_skb->priority = TC_PRIO_CONTROL; +		__skb_queue_tail(xmitq, _skb); +		retransm--; +		l->stats.retransmitted++;  	} +	return 0;  }  /* tipc_data_input - deliver data and name distr msgs to upper layer @@ -1126,29 +942,22 @@ discard:   * Consumes buffer if message is of right type   * Node lock must be held   */ -static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb) +static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, +			    struct sk_buff_head *inputq)  {  	struct tipc_node *node = link->owner; -	struct tipc_msg *msg = buf_msg(skb); -	u32 dport = msg_destport(msg); -	switch (msg_user(msg)) { +	switch (msg_user(buf_msg(skb))) {  	case TIPC_LOW_IMPORTANCE:  	case TIPC_MEDIUM_IMPORTANCE:  	case TIPC_HIGH_IMPORTANCE:  	case TIPC_CRITICAL_IMPORTANCE:  	case CONN_MANAGER: -		if (tipc_skb_queue_tail(&link->inputq, skb, dport)) { -			node->inputq = &link->inputq; -			node->action_flags |= TIPC_MSG_EVT; -		} +		__skb_queue_tail(inputq, skb);  		return true;  	case NAME_DISTRIBUTOR:  		node->bclink.recv_permitted = true; -		node->namedq = &link->namedq; -		skb_queue_tail(&link->namedq, skb); -		if (skb_queue_len(&link->namedq) == 1) -			node->action_flags |= TIPC_NAMED_MSG_EVT; +		skb_queue_tail(link->namedq, skb);  		return true;  	case MSG_BUNDLER:  	case TUNNEL_PROTOCOL: @@ -1165,54 +974,160 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)  /* tipc_link_input - process packet that has passed link protocol check   *   * Consumes buffer - * Node lock must be held   */ -static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb) +static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, +			   struct sk_buff_head *inputq)  { -	struct tipc_node *node = link->owner; -	struct tipc_msg *msg = buf_msg(skb); +	struct tipc_node *node = l->owner; +	struct tipc_msg *hdr = buf_msg(skb); +	struct sk_buff **reasm_skb = &l->reasm_buf;  	struct sk_buff *iskb; +	int usr = msg_user(hdr); +	int rc = 0;  	int pos = 0; +	int ipos = 0; -	if (likely(tipc_data_input(link, skb))) -		return; +	if (unlikely(usr == TUNNEL_PROTOCOL)) { +		if (msg_type(hdr) == SYNCH_MSG) { +			__skb_queue_purge(&l->deferdq); +			goto drop; +		} +		if (!tipc_msg_extract(skb, &iskb, &ipos)) +			return rc; +		kfree_skb(skb); +		skb = iskb; +		hdr = buf_msg(skb); +		if (less(msg_seqno(hdr), l->drop_point)) +			goto drop; +		if (tipc_data_input(l, skb, inputq)) +			return rc; +		usr = msg_user(hdr); +		reasm_skb = &l->failover_reasm_skb; +	} -	switch (msg_user(msg)) { -	case TUNNEL_PROTOCOL: -		if (msg_dup(msg)) { -			link->flags |= LINK_SYNCHING; -			link->synch_point = msg_seqno(msg_get_wrapped(msg)); -			kfree_skb(skb); -			break; +	if (usr == MSG_BUNDLER) { +		l->stats.recv_bundles++; +		l->stats.recv_bundled += msg_msgcnt(hdr); +		while (tipc_msg_extract(skb, &iskb, &pos)) +			tipc_data_input(l, iskb, inputq); +		return 0; +	} else if (usr == MSG_FRAGMENTER) { +		l->stats.recv_fragments++; +		if (tipc_buf_append(reasm_skb, &skb)) { +			l->stats.recv_fragmented++; +			tipc_data_input(l, skb, inputq); +		} else if (!*reasm_skb) { +			return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);  		} -		if (!tipc_link_failover_rcv(link, &skb)) -			break; -		if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { -			tipc_data_input(link, skb); +		return 0; +	} else if (usr == BCAST_PROTOCOL) { +		tipc_link_sync_rcv(node, skb); +		return 0; +	} +drop: +	kfree_skb(skb); +	return 0; +} + +static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) +{ +	bool released = false; +	struct sk_buff *skb, *tmp; + +	skb_queue_walk_safe(&l->transmq, skb, tmp) { +		if (more(buf_seqno(skb), acked))  			break; +		__skb_unlink(skb, &l->transmq); +		kfree_skb(skb); +		released = true; +	} +	return released; +} + +/* tipc_link_rcv - process TIPC packets/messages arriving from off-node + * @link: the link that should handle the message + * @skb: TIPC packet + * @xmitq: queue to place packets to be sent after this call + */ +int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, +		  struct sk_buff_head *xmitq) +{ +	struct sk_buff_head *arrvq = &l->deferdq; +	struct sk_buff_head tmpq; +	struct tipc_msg *hdr; +	u16 seqno, rcv_nxt; +	int rc = 0; + +	__skb_queue_head_init(&tmpq); + +	if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) { +		if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV)) +			tipc_link_build_proto_msg(l, STATE_MSG, 0, +						  0, 0, 0, xmitq); +		return rc; +	} + +	while ((skb = skb_peek(arrvq))) { +		hdr = buf_msg(skb); + +		/* Verify and update link state */ +		if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) { +			__skb_dequeue(arrvq); +			rc = tipc_link_proto_rcv(l, skb, xmitq); +			continue;  		} -	case MSG_BUNDLER: -		link->stats.recv_bundles++; -		link->stats.recv_bundled += msg_msgcnt(msg); -		while (tipc_msg_extract(skb, &iskb, &pos)) -			tipc_data_input(link, iskb); -		break; -	case MSG_FRAGMENTER: -		link->stats.recv_fragments++; -		if (tipc_buf_append(&link->reasm_buf, &skb)) { -			link->stats.recv_fragmented++; -			tipc_data_input(link, skb); -		} else if (!link->reasm_buf) { -			tipc_link_reset(link); +		if (unlikely(!link_is_up(l))) { +			rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); +			if (!link_is_up(l)) { +				kfree_skb(__skb_dequeue(arrvq)); +				goto exit; +			}  		} -		break; -	case BCAST_PROTOCOL: -		tipc_link_sync_rcv(node, skb); -		break; -	default: -		break; -	}; + +		l->silent_intv_cnt = 0; + +		/* Forward queues and wake up waiting users */ +		if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) { +			tipc_link_advance_backlog(l, xmitq); +			if (unlikely(!skb_queue_empty(&l->wakeupq))) +				link_prepare_wakeup(l); +		} + +		/* Defer reception if there is a gap in the sequence */ +		seqno = msg_seqno(hdr); +		rcv_nxt = l->rcv_nxt; +		if (unlikely(less(rcv_nxt, seqno))) { +			l->stats.deferred_recv++; +			goto exit; +		} + +		__skb_dequeue(arrvq); + +		/* Drop if packet already received */ +		if (unlikely(more(rcv_nxt, seqno))) { +			l->stats.duplicates++; +			kfree_skb(skb); +			goto exit; +		} + +		/* Packet can be delivered */ +		l->rcv_nxt++; +		l->stats.recv_info++; +		if (unlikely(!tipc_data_input(l, skb, &tmpq))) +			rc = tipc_link_input(l, skb, &tmpq); + +		/* Ack at regular intervals */ +		if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { +			l->rcv_unacked = 0; +			l->stats.sent_acks++; +			tipc_link_build_proto_msg(l, STATE_MSG, +						  0, 0, 0, 0, xmitq); +		} +	} +exit: +	tipc_skb_queue_splice_tail(&tmpq, l->inputq); +	return rc;  }  /** @@ -1255,458 +1170,250 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)  }  /* - * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet + * Send protocol message to the other endpoint.   */ -static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, -				       struct sk_buff *buf) +void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, +			  u32 gap, u32 tolerance, u32 priority)  { -	u32 seq_no = buf_seqno(buf); +	struct sk_buff *skb = NULL; +	struct sk_buff_head xmitq; -	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { -		tipc_link_proto_rcv(l_ptr, buf); +	__skb_queue_head_init(&xmitq); +	tipc_link_build_proto_msg(l, msg_typ, probe_msg, gap, +				  tolerance, priority, &xmitq); +	skb = __skb_dequeue(&xmitq); +	if (!skb)  		return; -	} - -	/* Record OOS packet arrival */ -	l_ptr->silent_intv_cnt = 0; +	tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr); +	l->rcv_unacked = 0; +	kfree_skb(skb); +} -	/* -	 * Discard packet if a duplicate; otherwise add it to deferred queue -	 * and notify peer of gap as per protocol specification -	 */ -	if (less(seq_no, l_ptr->rcv_nxt)) { -		l_ptr->stats.duplicates++; -		kfree_skb(buf); +/* tipc_link_build_proto_msg: prepare link protocol message for transmission + */ +static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, +				      u16 rcvgap, int tolerance, int priority, +				      struct sk_buff_head *xmitq) +{ +	struct sk_buff *skb = NULL; +	struct tipc_msg *hdr = l->pmsg; +	u16 snd_nxt = l->snd_nxt; +	u16 rcv_nxt = l->rcv_nxt; +	u16 rcv_last = rcv_nxt - 1; +	int node_up = l->owner->bclink.recv_permitted; + +	/* Don't send protocol message during reset or link failover */ +	if (tipc_link_is_blocked(l))  		return; -	} -	if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) { -		l_ptr->stats.deferred_recv++; -		if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1) -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0); +	msg_set_type(hdr, mtyp); +	msg_set_net_plane(hdr, l->net_plane); +	msg_set_bcast_ack(hdr, l->owner->bclink.last_in); +	msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); +	msg_set_link_tolerance(hdr, tolerance); +	msg_set_linkprio(hdr, priority); +	msg_set_redundant_link(hdr, node_up); +	msg_set_seq_gap(hdr, 0); + +	/* Compatibility: created msg must not be in sequence with pkt flow */ +	msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); + +	if (mtyp == STATE_MSG) { +		if (!tipc_link_is_up(l)) +			return; +		msg_set_next_sent(hdr, snd_nxt); + +		/* Override rcvgap if there are packets in deferred queue */ +		if (!skb_queue_empty(&l->deferdq)) +			rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; +		if (rcvgap) { +			msg_set_seq_gap(hdr, rcvgap); +			l->stats.sent_nacks++; +		} +		msg_set_ack(hdr, rcv_last); +		msg_set_probe(hdr, probe); +		if (probe) +			l->stats.sent_probes++; +		l->stats.sent_states++;  	} else { -		l_ptr->stats.duplicates++; +		/* RESET_MSG or ACTIVATE_MSG */ +		msg_set_max_pkt(hdr, l->advertised_mtu); +		msg_set_ack(hdr, l->rcv_nxt - 1); +		msg_set_next_sent(hdr, 1);  	} +	skb = tipc_buf_acquire(msg_size(hdr)); +	if (!skb) +		return; +	skb_copy_to_linear_data(skb, hdr, msg_size(hdr)); +	skb->priority = TC_PRIO_CONTROL; +	__skb_queue_tail(xmitq, skb);  } -/* - * Send protocol message to the other endpoint. +/* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets + * with contents of the link's tranmsit and backlog queues.   */ -void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, -			  u32 gap, u32 tolerance, u32 priority) +void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, +			   int mtyp, struct sk_buff_head *xmitq)  { -	struct sk_buff *buf = NULL; -	struct tipc_msg *msg = l_ptr->pmsg; -	u32 msg_size = sizeof(l_ptr->proto_msg); -	int r_flag; -	u16 last_rcv; - -	/* Don't send protocol message during link failover */ -	if (l_ptr->flags & LINK_FAILINGOVER) -		return; +	struct sk_buff *skb, *tnlskb; +	struct tipc_msg *hdr, tnlhdr; +	struct sk_buff_head *queue = &l->transmq; +	struct sk_buff_head tmpxq, tnlq; +	u16 pktlen, pktcnt, seqno = l->snd_nxt; -	/* Abort non-RESET send if communication with node is prohibited */ -	if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG)) +	if (!tnl)  		return; -	/* Create protocol message with "out-of-sequence" sequence number */ -	msg_set_type(msg, msg_typ); -	msg_set_net_plane(msg, l_ptr->net_plane); -	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); -	msg_set_last_bcast(msg, tipc_bclink_get_last_sent(l_ptr->owner->net)); - -	if (msg_typ == STATE_MSG) { -		u16 next_sent = l_ptr->snd_nxt; +	skb_queue_head_init(&tnlq); +	skb_queue_head_init(&tmpxq); -		if (!tipc_link_is_up(l_ptr)) +	/* At least one packet required for safe algorithm => add dummy */ +	skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, +			      BASIC_H_SIZE, 0, l->addr, link_own_addr(l), +			      0, 0, TIPC_ERR_NO_PORT); +	if (!skb) { +		pr_warn("%sunable to create tunnel packet\n", link_co_err); +		return; +	} +	skb_queue_tail(&tnlq, skb); +	tipc_link_xmit(l, &tnlq, &tmpxq); +	__skb_queue_purge(&tmpxq); + +	/* Initialize reusable tunnel packet header */ +	tipc_msg_init(link_own_addr(l), &tnlhdr, TUNNEL_PROTOCOL, +		      mtyp, INT_H_SIZE, l->addr); +	pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq); +	msg_set_msgcnt(&tnlhdr, pktcnt); +	msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); +tnl: +	/* Wrap each packet into a tunnel packet */ +	skb_queue_walk(queue, skb) { +		hdr = buf_msg(skb); +		if (queue == &l->backlogq) +			msg_set_seqno(hdr, seqno++); +		pktlen = msg_size(hdr); +		msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); +		tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE); +		if (!tnlskb) { +			pr_warn("%sunable to send packet\n", link_co_err);  			return; -		msg_set_next_sent(msg, next_sent); -		if (!skb_queue_empty(&l_ptr->deferdq)) { -			last_rcv = buf_seqno(skb_peek(&l_ptr->deferdq)); -			gap = mod(last_rcv - l_ptr->rcv_nxt);  		} -		msg_set_seq_gap(msg, gap); -		if (gap) -			l_ptr->stats.sent_nacks++; -		msg_set_link_tolerance(msg, tolerance); -		msg_set_linkprio(msg, priority); -		msg_set_max_pkt(msg, l_ptr->mtu); -		msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); -		msg_set_probe(msg, probe_msg != 0); -		if (probe_msg) -			l_ptr->stats.sent_probes++; -		l_ptr->stats.sent_states++; -	} else {		/* RESET_MSG or ACTIVATE_MSG */ -		msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1)); -		msg_set_seq_gap(msg, 0); -		msg_set_next_sent(msg, 1); -		msg_set_probe(msg, 0); -		msg_set_link_tolerance(msg, l_ptr->tolerance); -		msg_set_linkprio(msg, l_ptr->priority); -		msg_set_max_pkt(msg, l_ptr->advertised_mtu); +		skb_copy_to_linear_data(tnlskb, &tnlhdr, INT_H_SIZE); +		skb_copy_to_linear_data_offset(tnlskb, INT_H_SIZE, hdr, pktlen); +		__skb_queue_tail(&tnlq, tnlskb); +	} +	if (queue != &l->backlogq) { +		queue = &l->backlogq; +		goto tnl;  	} -	r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); -	msg_set_redundant_link(msg, r_flag); -	msg_set_linkprio(msg, l_ptr->priority); -	msg_set_size(msg, msg_size); - -	msg_set_seqno(msg, mod(l_ptr->snd_nxt + (0xffff / 2))); - -	buf = tipc_buf_acquire(msg_size); -	if (!buf) -		return; +	tipc_link_xmit(tnl, &tnlq, xmitq); -	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); -	buf->priority = TC_PRIO_CONTROL; -	tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf, -			 &l_ptr->media_addr); -	l_ptr->rcv_unacked = 0; -	kfree_skb(buf); +	if (mtyp == FAILOVER_MSG) { +		tnl->drop_point = l->rcv_nxt; +		tnl->failover_reasm_skb = l->reasm_buf; +		l->reasm_buf = NULL; +	}  } -/* - * Receive protocol message : +/* tipc_link_proto_rcv(): receive link level protocol message :   * Note that network plane id propagates through the network, and may - * change at any time. The node with lowest address rules + * change at any time. The node with lowest numerical id determines + * network plane   */ -static void tipc_link_proto_rcv(struct tipc_link *l_ptr, -				struct sk_buff *buf) +static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, +			       struct sk_buff_head *xmitq)  { -	u32 rec_gap = 0; -	u32 msg_tol; -	struct tipc_msg *msg = buf_msg(buf); +	struct tipc_msg *hdr = buf_msg(skb); +	u16 rcvgap = 0; +	u16 nacked_gap = msg_seq_gap(hdr); +	u16 peers_snd_nxt =  msg_next_sent(hdr); +	u16 peers_tol = msg_link_tolerance(hdr); +	u16 peers_prio = msg_linkprio(hdr); +	u16 rcv_nxt = l->rcv_nxt; +	char *if_name; +	int rc = 0; -	if (l_ptr->flags & LINK_FAILINGOVER) +	if (tipc_link_is_blocked(l))  		goto exit; -	if (l_ptr->net_plane != msg_net_plane(msg)) -		if (link_own_addr(l_ptr) > msg_prevnode(msg)) -			l_ptr->net_plane = msg_net_plane(msg); - -	switch (msg_type(msg)) { +	if (link_own_addr(l) > msg_prevnode(hdr)) +		l->net_plane = msg_net_plane(hdr); +	switch (msg_type(hdr)) {  	case RESET_MSG: -		if (!link_working_unknown(l_ptr) && -		    (l_ptr->peer_session != INVALID_SESSION)) { -			if (less_eq(msg_session(msg), l_ptr->peer_session)) -				break; /* duplicate or old reset: ignore */ -		} - -		if (!msg_redundant_link(msg) && (link_working_working(l_ptr) || -				link_working_unknown(l_ptr))) { -			/* -			 * peer has lost contact -- don't allow peer's links -			 * to reactivate before we recognize loss & clean up -			 */ -			l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN; -		} - -		link_state_event(l_ptr, RESET_MSG); +		/* Ignore duplicate RESET with old session number */ +		if ((less_eq(msg_session(hdr), l->peer_session)) && +		    (l->peer_session != WILDCARD_SESSION)) +			break;  		/* fall thru' */ -	case ACTIVATE_MSG: -		/* Update link settings according other endpoint's values */ -		strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); -		msg_tol = msg_link_tolerance(msg); -		if (msg_tol > l_ptr->tolerance) -			link_set_supervision_props(l_ptr, msg_tol); - -		if (msg_linkprio(msg) > l_ptr->priority) -			l_ptr->priority = msg_linkprio(msg); - -		if (l_ptr->mtu > msg_max_pkt(msg)) -			l_ptr->mtu = msg_max_pkt(msg); - -		/* Synchronize broadcast link info, if not done previously */ -		if (!tipc_node_is_up(l_ptr->owner)) { -			l_ptr->owner->bclink.last_sent = -				l_ptr->owner->bclink.last_in = -				msg_last_bcast(msg); -			l_ptr->owner->bclink.oos_state = 0; -		} - -		l_ptr->peer_session = msg_session(msg); -		l_ptr->peer_bearer_id = msg_bearer_id(msg); - -		if (msg_type(msg) == ACTIVATE_MSG) -			link_state_event(l_ptr, ACTIVATE_MSG); -		break; -	case STATE_MSG: +	case ACTIVATE_MSG: -		msg_tol = msg_link_tolerance(msg); -		if (msg_tol) -			link_set_supervision_props(l_ptr, msg_tol); - -		if (msg_linkprio(msg) && -		    (msg_linkprio(msg) != l_ptr->priority)) { -			pr_debug("%s<%s>, priority change %u->%u\n", -				 link_rst_msg, l_ptr->name, -				 l_ptr->priority, msg_linkprio(msg)); -			l_ptr->priority = msg_linkprio(msg); -			tipc_link_reset(l_ptr); /* Enforce change to take effect */ +		/* Complete own link name with peer's interface name */ +		if_name =  strrchr(l->name, ':') + 1; +		if (sizeof(l->name) - (if_name - l->name) <= TIPC_MAX_IF_NAME)  			break; -		} - -		/* Record reception; force mismatch at next timeout: */ -		l_ptr->silent_intv_cnt = 0; - -		link_state_event(l_ptr, TRAFFIC_MSG_EVT); -		l_ptr->stats.recv_states++; -		if (link_reset_unknown(l_ptr)) +		if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME)  			break; +		strncpy(if_name, msg_data(hdr),	TIPC_MAX_IF_NAME); -		if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg))) -			rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt); +		/* Update own tolerance if peer indicates a non-zero value */ +		if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL)) +			l->tolerance = peers_tol; -		if (msg_probe(msg)) -			l_ptr->stats.recv_probes++; +		/* Update own priority if peer's priority is higher */ +		if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI)) +			l->priority = peers_prio; -		/* Protocol message before retransmits, reduce loss risk */ -		if (l_ptr->owner->bclink.recv_permitted) -			tipc_bclink_update_link_state(l_ptr->owner, -						      msg_last_bcast(msg)); - -		if (rec_gap || (msg_probe(msg))) { -			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, -					     rec_gap, 0, 0); -		} -		if (msg_seq_gap(msg)) { -			l_ptr->stats.recv_nacks++; -			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq), -					     msg_seq_gap(msg)); +		if (msg_type(hdr) == RESET_MSG) { +			rc |= tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); +		} else if (!link_is_up(l)) { +			tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); +			rc |= tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);  		} +		l->peer_session = msg_session(hdr); +		l->peer_bearer_id = msg_bearer_id(hdr); +		if (l->mtu > msg_max_pkt(hdr)) +			l->mtu = msg_max_pkt(hdr);  		break; -	} -exit: -	kfree_skb(buf); -} - - -/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to - * a different bearer. Owner node is locked. - */ -static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, -				  struct tipc_msg *tunnel_hdr, -				  struct tipc_msg *msg, -				  u32 selector) -{ -	struct tipc_link *tunnel; -	struct sk_buff *skb; -	u32 length = msg_size(msg); - -	tunnel = l_ptr->owner->active_links[selector & 1]; -	if (!tipc_link_is_up(tunnel)) { -		pr_warn("%stunnel link no longer available\n", link_co_err); -		return; -	} -	msg_set_size(tunnel_hdr, length + INT_H_SIZE); -	skb = tipc_buf_acquire(length + INT_H_SIZE); -	if (!skb) { -		pr_warn("%sunable to send tunnel msg\n", link_co_err); -		return; -	} -	skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE); -	skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length); -	__tipc_link_xmit_skb(tunnel, skb); -} - - -/* tipc_link_failover_send_queue(): A link has gone down, but a second - * link is still active. We can do failover. Tunnel the failing link's - * whole send queue via the remaining link. This way, we don't lose - * any packets, and sequence order is preserved for subsequent traffic - * sent over the remaining link. Owner node is locked. - */ -void tipc_link_failover_send_queue(struct tipc_link *l_ptr) -{ -	int msgcount; -	struct tipc_link *tunnel = l_ptr->owner->active_links[0]; -	struct tipc_msg tunnel_hdr; -	struct sk_buff *skb; -	int split_bundles; - -	if (!tunnel) -		return; -	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL, -		      FAILOVER_MSG, INT_H_SIZE, l_ptr->addr); - -	skb_queue_walk(&l_ptr->backlogq, skb) { -		msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt); -		l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1); -	} -	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq); -	tipc_link_purge_backlog(l_ptr); -	msgcount = skb_queue_len(&l_ptr->transmq); -	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); -	msg_set_msgcnt(&tunnel_hdr, msgcount); - -	if (skb_queue_empty(&l_ptr->transmq)) { -		skb = tipc_buf_acquire(INT_H_SIZE); -		if (skb) { -			skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); -			msg_set_size(&tunnel_hdr, INT_H_SIZE); -			__tipc_link_xmit_skb(tunnel, skb); -		} else { -			pr_warn("%sunable to send changeover msg\n", -				link_co_err); -		} -		return; -	} - -	split_bundles = (l_ptr->owner->active_links[0] != -			 l_ptr->owner->active_links[1]); +	case STATE_MSG: -	skb_queue_walk(&l_ptr->transmq, skb) { -		struct tipc_msg *msg = buf_msg(skb); +		/* Update own tolerance if peer indicates a non-zero value */ +		if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL)) +			l->tolerance = peers_tol; -		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { -			struct tipc_msg *m = msg_get_wrapped(msg); -			unchar *pos = (unchar *)m; +		l->silent_intv_cnt = 0; +		l->stats.recv_states++; +		if (msg_probe(hdr)) +			l->stats.recv_probes++; +		rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); +		if (!link_is_up(l)) +			break; -			msgcount = msg_msgcnt(msg); -			while (msgcount--) { -				msg_set_seqno(m, msg_seqno(msg)); -				tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m, -						      msg_link_selector(m)); -				pos += align(msg_size(m)); -				m = (struct tipc_msg *)pos; -			} -		} else { -			tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, -					      msg_link_selector(msg)); +		/* Send NACK if peer has sent pkts we haven't received yet */ +		if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l)) +			rcvgap = peers_snd_nxt - l->rcv_nxt; +		if (rcvgap || (msg_probe(hdr))) +			tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, +						  0, 0, xmitq); +		tipc_link_release_pkts(l, msg_ack(hdr)); + +		/* If NACK, retransmit will now start at right position */ +		if (nacked_gap) { +			rc = tipc_link_retransm(l, nacked_gap, xmitq); +			l->stats.recv_nacks++;  		} -	} -} - -/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a - * duplicate of the first link's send queue via the new link. This way, we - * are guaranteed that currently queued packets from a socket are delivered - * before future traffic from the same socket, even if this is using the - * new link. The last arriving copy of each duplicate packet is dropped at - * the receiving end by the regular protocol check, so packet cardinality - * and sequence order is preserved per sender/receiver socket pair. - * Owner node is locked. - */ -void tipc_link_dup_queue_xmit(struct tipc_link *link, -			      struct tipc_link *tnl) -{ -	struct sk_buff *skb; -	struct tipc_msg tnl_hdr; -	struct sk_buff_head *queue = &link->transmq; -	int mcnt; -	u16 seqno; - -	tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL, -		      SYNCH_MSG, INT_H_SIZE, link->addr); -	mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq); -	msg_set_msgcnt(&tnl_hdr, mcnt); -	msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id); - -tunnel_queue: -	skb_queue_walk(queue, skb) { -		struct sk_buff *outskb; -		struct tipc_msg *msg = buf_msg(skb); -		u32 len = msg_size(msg); -		msg_set_ack(msg, mod(link->rcv_nxt - 1)); -		msg_set_bcast_ack(msg, link->owner->bclink.last_in); -		msg_set_size(&tnl_hdr, len + INT_H_SIZE); -		outskb = tipc_buf_acquire(len + INT_H_SIZE); -		if (outskb == NULL) { -			pr_warn("%sunable to send duplicate msg\n", -				link_co_err); -			return; -		} -		skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE); -		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, -					       skb->data, len); -		__tipc_link_xmit_skb(tnl, outskb); -		if (!tipc_link_is_up(link)) -			return; -	} -	if (queue == &link->backlogq) -		return; -	seqno = link->snd_nxt; -	skb_queue_walk(&link->backlogq, skb) { -		msg_set_seqno(buf_msg(skb), seqno); -		seqno = mod(seqno + 1); -	} -	queue = &link->backlogq; -	goto tunnel_queue; -} - -/*  tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet - *  Owner node is locked. - */ -static bool tipc_link_failover_rcv(struct tipc_link *link, -				   struct sk_buff **skb) -{ -	struct tipc_msg *msg = buf_msg(*skb); -	struct sk_buff *iskb = NULL; -	struct tipc_link *pl = NULL; -	int bearer_id = msg_bearer_id(msg); -	int pos = 0; - -	if (msg_type(msg) != FAILOVER_MSG) { -		pr_warn("%sunknown tunnel pkt received\n", link_co_err); -		goto exit; -	} -	if (bearer_id >= MAX_BEARERS) -		goto exit; - -	if (bearer_id == link->bearer_id) -		goto exit; - -	pl = link->owner->links[bearer_id]; -	if (pl && tipc_link_is_up(pl)) -		tipc_link_reset(pl); - -	if (link->failover_pkts == FIRST_FAILOVER) -		link->failover_pkts = msg_msgcnt(msg); - -	/* Should we expect an inner packet? */ -	if (!link->failover_pkts) -		goto exit; - -	if (!tipc_msg_extract(*skb, &iskb, &pos)) { -		pr_warn("%sno inner failover pkt\n", link_co_err); -		*skb = NULL; -		goto exit; -	} -	link->failover_pkts--; -	*skb = NULL; - -	/* Was this packet already delivered? */ -	if (less(buf_seqno(iskb), link->failover_checkpt)) { -		kfree_skb(iskb); -		iskb = NULL; -		goto exit; -	} -	if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) { -		link->stats.recv_fragments++; -		tipc_buf_append(&link->failover_skb, &iskb); +		tipc_link_advance_backlog(l, xmitq); +		if (unlikely(!skb_queue_empty(&l->wakeupq))) +			link_prepare_wakeup(l);  	}  exit: -	if (!link->failover_pkts && pl) -		pl->flags &= ~LINK_FAILINGOVER; -	kfree_skb(*skb); -	*skb = iskb; -	return *skb; -} - -static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) -{ -	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; - -	if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL)) -		return; - -	l_ptr->tolerance = tol; -	l_ptr->keepalive_intv = msecs_to_jiffies(intv); -	l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->keepalive_intv)); +	kfree_skb(skb); +	return rc;  }  void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) @@ -1743,7 +1450,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net,  	list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {  		tipc_node_lock(n_ptr);  		for (i = 0; i < MAX_BEARERS; i++) { -			l_ptr = n_ptr->links[i]; +			l_ptr = n_ptr->links[i].link;  			if (l_ptr && !strcmp(l_ptr->name, link_name)) {  				*bearer_id = i;  				found_node = n_ptr; @@ -1770,27 +1477,16 @@ static void link_reset_statistics(struct tipc_link *l_ptr)  	l_ptr->stats.recv_info = l_ptr->rcv_nxt;  } -static void link_print(struct tipc_link *l_ptr, const char *str) +static void link_print(struct tipc_link *l, const char *str)  { -	struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id); -	struct tipc_bearer *b_ptr; - -	rcu_read_lock(); -	b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]); -	if (b_ptr) -		pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name); -	rcu_read_unlock(); - -	if (link_working_unknown(l_ptr)) -		pr_cont(":WU\n"); -	else if (link_reset_reset(l_ptr)) -		pr_cont(":RR\n"); -	else if (link_reset_unknown(l_ptr)) -		pr_cont(":RU\n"); -	else if (link_working_working(l_ptr)) -		pr_cont(":WW\n"); -	else -		pr_cont("\n"); +	struct sk_buff *hskb = skb_peek(&l->transmq); +	u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; +	u16 tail = l->snd_nxt - 1; + +	pr_info("%s Link <%s> state %x\n", str, l->name, l->state); +	pr_info("XMTQ: %u [%u-%u], BKLGQ: %u, SNDNX: %u, RCVNX: %u\n", +		skb_queue_len(&l->transmq), head, tail, +		skb_queue_len(&l->backlogq), l->snd_nxt, l->rcv_nxt);  }  /* Parse and validate nested (link) properties valid for media, bearer and link @@ -1865,7 +1561,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)  	tipc_node_lock(node); -	link = node->links[bearer_id]; +	link = node->links[bearer_id].link;  	if (!link) {  		res = -EINVAL;  		goto out; @@ -1885,7 +1581,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)  			u32 tol;  			tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]); -			link_set_supervision_props(link, tol); +			link->tolerance = tol;  			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0);  		}  		if (props[TIPC_NLA_PROP_PRIO]) { @@ -2055,10 +1751,11 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,  	for (i = *prev_link; i < MAX_BEARERS; i++) {  		*prev_link = i; -		if (!node->links[i]) +		if (!node->links[i].link)  			continue; -		err = __tipc_nl_add_link(net, msg, node->links[i], NLM_F_MULTI); +		err = __tipc_nl_add_link(net, msg, +					 node->links[i].link, NLM_F_MULTI);  		if (err)  			return err;  	} @@ -2172,7 +1869,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)  			return -EINVAL;  		tipc_node_lock(node); -		link = node->links[bearer_id]; +		link = node->links[bearer_id].link;  		if (!link) {  			tipc_node_unlock(node);  			nlmsg_free(msg.skb); @@ -2227,7 +1924,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)  	tipc_node_lock(node); -	link = node->links[bearer_id]; +	link = node->links[bearer_id].link;  	if (!link) {  		tipc_node_unlock(node);  		return -EINVAL; diff --git a/net/tipc/link.h b/net/tipc/link.h index ae0a0ea572f2..39ff8b6919a4 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -49,19 +49,25 @@   */  #define INVALID_LINK_SEQ 0x10000 -/* Link working states +/* Link FSM events:   */ -#define WORKING_WORKING 560810u -#define WORKING_UNKNOWN 560811u -#define RESET_UNKNOWN   560812u -#define RESET_RESET     560813u +enum { +	LINK_ESTABLISH_EVT       = 0xec1ab1e, +	LINK_PEER_RESET_EVT      = 0x9eed0e, +	LINK_FAILURE_EVT         = 0xfa110e, +	LINK_RESET_EVT           = 0x10ca1d0e, +	LINK_FAILOVER_BEGIN_EVT  = 0xfa110bee, +	LINK_FAILOVER_END_EVT    = 0xfa110ede, +	LINK_SYNCH_BEGIN_EVT     = 0xc1ccbee, +	LINK_SYNCH_END_EVT       = 0xc1ccede +}; -/* Link endpoint execution states +/* Events returned from link at packet reception or at timeout   */ -#define LINK_STARTED     0x0001 -#define LINK_STOPPED     0x0002 -#define LINK_SYNCHING    0x0004 -#define LINK_FAILINGOVER 0x0008 +enum { +	TIPC_LINK_UP_EVT       = 1, +	TIPC_LINK_DOWN_EVT     = (1 << 1) +};  /* Starting value for maximum packet size negotiation on unicast links   * (unless bearer MTU is less) @@ -106,7 +112,6 @@ struct tipc_stats {   * @timer: link timer   * @owner: pointer to peer node   * @refcnt: reference counter for permanent references (owner node & timer) - * @flags: execution state flags for link endpoint instance   * @peer_session: link session # being used by peer end of link   * @peer_bearer_id: bearer id used by link's peer endpoint   * @bearer_id: local bearer id used by link @@ -143,20 +148,17 @@ struct tipc_stats {  struct tipc_link {  	u32 addr;  	char name[TIPC_MAX_LINK_NAME]; -	struct tipc_media_addr media_addr; -	struct timer_list timer; +	struct tipc_media_addr *media_addr;  	struct tipc_node *owner; -	struct kref ref;  	/* Management and link supervision data */ -	unsigned int flags;  	u32 peer_session;  	u32 peer_bearer_id;  	u32 bearer_id;  	u32 tolerance;  	unsigned long keepalive_intv;  	u32 abort_limit; -	int state; +	u32 state;  	u32 silent_intv_cnt;  	struct {  		unchar hdr[INT_H_SIZE]; @@ -165,12 +167,10 @@ struct tipc_link {  	struct tipc_msg *pmsg;  	u32 priority;  	char net_plane; -	u16 synch_point; -	/* Failover */ -	u16 failover_pkts; -	u16 failover_checkpt; -	struct sk_buff *failover_skb; +	/* Failover/synch */ +	u16 drop_point; +	struct sk_buff *failover_reasm_skb;  	/* Max packet negotiation */  	u16 mtu; @@ -192,8 +192,8 @@ struct tipc_link {  	u16 rcv_nxt;  	u32 rcv_unacked;  	struct sk_buff_head deferdq; -	struct sk_buff_head inputq; -	struct sk_buff_head namedq; +	struct sk_buff_head *inputq; +	struct sk_buff_head *namedq;  	/* Congestion handling */  	struct sk_buff_head wakeupq; @@ -205,28 +205,29 @@ struct tipc_link {  	struct tipc_stats stats;  }; -struct tipc_port; - -struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, -			      struct tipc_bearer *b_ptr, -			      const struct tipc_media_addr *media_addr); -void tipc_link_delete(struct tipc_link *link); -void tipc_link_delete_list(struct net *net, unsigned int bearer_id); -void tipc_link_failover_send_queue(struct tipc_link *l_ptr); -void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest); +bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, +		      u32 ownnode, u32 peer, struct tipc_media_addr *maddr, +		      struct sk_buff_head *inputq, struct sk_buff_head *namedq, +		      struct tipc_link **link); +void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, +			   int mtyp, struct sk_buff_head *xmitq); +void tipc_link_build_bcast_sync_msg(struct tipc_link *l, +				    struct sk_buff_head *xmitq); +int tipc_link_fsm_evt(struct tipc_link *l, int evt);  void tipc_link_reset_fragments(struct tipc_link *l_ptr); -int tipc_link_is_up(struct tipc_link *l_ptr); +bool tipc_link_is_up(struct tipc_link *l); +bool tipc_link_is_reset(struct tipc_link *l); +bool tipc_link_is_synching(struct tipc_link *l); +bool tipc_link_is_failingover(struct tipc_link *l); +bool tipc_link_is_blocked(struct tipc_link *l);  int tipc_link_is_active(struct tipc_link *l_ptr);  void tipc_link_purge_queues(struct tipc_link *l_ptr);  void tipc_link_purge_backlog(struct tipc_link *l); -void tipc_link_reset_all(struct tipc_node *node);  void tipc_link_reset(struct tipc_link *l_ptr); -int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, -		       u32 selector); -int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest, -		   u32 selector);  int __tipc_link_xmit(struct net *net, struct tipc_link *link,  		     struct sk_buff_head *list); +int tipc_link_xmit(struct tipc_link *link,	struct sk_buff_head *list, +		   struct sk_buff_head *xmitq);  void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,  			  u32 gap, u32 tolerance, u32 priority);  void tipc_link_push_packets(struct tipc_link *l_ptr); @@ -242,34 +243,8 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);  int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);  int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);  int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); -void link_prepare_wakeup(struct tipc_link *l); - -static inline u32 link_own_addr(struct tipc_link *l) -{ -	return msg_prevnode(l->pmsg); -} - -/* - * Link status checking routines - */ -static inline int link_working_working(struct tipc_link *l_ptr) -{ -	return l_ptr->state == WORKING_WORKING; -} - -static inline int link_working_unknown(struct tipc_link *l_ptr) -{ -	return l_ptr->state == WORKING_UNKNOWN; -} - -static inline int link_reset_unknown(struct tipc_link *l_ptr) -{ -	return l_ptr->state == RESET_UNKNOWN; -} - -static inline int link_reset_reset(struct tipc_link *l_ptr) -{ -	return l_ptr->state == RESET_RESET; -} +int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); +int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, +		  struct sk_buff_head *xmitq);  #endif diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 08b4cc7d496d..562c926a51cc 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -463,60 +463,72 @@ bool tipc_msg_make_bundle(struct sk_buff **skb,  struct tipc_msg *msg,  /**   * tipc_msg_reverse(): swap source and destination addresses and add error code - * @buf:  buffer containing message to be reversed - * @dnode: return value: node where to send message after reversal - * @err:  error code to be set in message - * Consumes buffer if failure + * @own_node: originating node id for reversed message + * @skb:  buffer containing message to be reversed; may be replaced. + * @err:  error code to be set in message, if any + * Consumes buffer at failure   * Returns true if success, otherwise false   */ -bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode, -		      int err) +bool tipc_msg_reverse(u32 own_node,  struct sk_buff **skb, int err)  { -	struct tipc_msg *msg = buf_msg(buf); +	struct sk_buff *_skb = *skb; +	struct tipc_msg *hdr = buf_msg(_skb);  	struct tipc_msg ohdr; -	uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE); +	int dlen = min_t(uint, msg_data_sz(hdr), MAX_FORWARD_SIZE); -	if (skb_linearize(buf)) +	if (skb_linearize(_skb))  		goto exit; -	msg = buf_msg(buf); -	if (msg_dest_droppable(msg)) +	hdr = buf_msg(_skb); +	if (msg_dest_droppable(hdr))  		goto exit; -	if (msg_errcode(msg)) +	if (msg_errcode(hdr))  		goto exit; -	memcpy(&ohdr, msg, msg_hdr_sz(msg)); -	msg_set_errcode(msg, err); -	msg_set_origport(msg, msg_destport(&ohdr)); -	msg_set_destport(msg, msg_origport(&ohdr)); -	msg_set_prevnode(msg, own_addr); -	if (!msg_short(msg)) { -		msg_set_orignode(msg, msg_destnode(&ohdr)); -		msg_set_destnode(msg, msg_orignode(&ohdr)); + +	/* Take a copy of original header before altering message */ +	memcpy(&ohdr, hdr, msg_hdr_sz(hdr)); + +	/* Never return SHORT header; expand by replacing buffer if necessary */ +	if (msg_short(hdr)) { +		*skb = tipc_buf_acquire(BASIC_H_SIZE + dlen); +		if (!*skb) +			goto exit; +		memcpy((*skb)->data + BASIC_H_SIZE, msg_data(hdr), dlen); +		kfree_skb(_skb); +		_skb = *skb; +		hdr = buf_msg(_skb); +		memcpy(hdr, &ohdr, BASIC_H_SIZE); +		msg_set_hdr_sz(hdr, BASIC_H_SIZE);  	} -	msg_set_size(msg, msg_hdr_sz(msg) + rdsz); -	skb_trim(buf, msg_size(msg)); -	skb_orphan(buf); -	*dnode = msg_orignode(&ohdr); + +	/* Now reverse the concerned fields */ +	msg_set_errcode(hdr, err); +	msg_set_origport(hdr, msg_destport(&ohdr)); +	msg_set_destport(hdr, msg_origport(&ohdr)); +	msg_set_destnode(hdr, msg_prevnode(&ohdr)); +	msg_set_prevnode(hdr, own_node); +	msg_set_orignode(hdr, own_node); +	msg_set_size(hdr, msg_hdr_sz(hdr) + dlen); +	skb_trim(_skb, msg_size(hdr)); +	skb_orphan(_skb);  	return true;  exit: -	kfree_skb(buf); -	*dnode = 0; +	kfree_skb(_skb); +	*skb = NULL;  	return false;  }  /**   * tipc_msg_lookup_dest(): try to find new destination for named message   * @skb: the buffer containing the message. - * @dnode: return value: next-hop node, if destination found - * @err: return value: error code to use, if message to be rejected + * @err: error code to be used by caller if lookup fails   * Does not consume buffer   * Returns true if a destination is found, false otherwise   */ -bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, -			  u32 *dnode, int *err) +bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)  {  	struct tipc_msg *msg = buf_msg(skb); -	u32 dport; -	u32 own_addr = tipc_own_addr(net); +	u32 dport, dnode; +	u32 onode = tipc_own_addr(net);  	if (!msg_isdata(msg))  		return false; @@ -529,15 +541,15 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,  		return false;  	if (msg_reroute_cnt(msg))  		return false; -	*dnode = addr_domain(net, msg_lookup_scope(msg)); +	dnode = addr_domain(net, msg_lookup_scope(msg));  	dport = tipc_nametbl_translate(net, msg_nametype(msg), -				       msg_nameinst(msg), dnode); +				       msg_nameinst(msg), &dnode);  	if (!dport)  		return false;  	msg_incr_reroute_cnt(msg); -	if (*dnode != own_addr) -		msg_set_prevnode(msg, own_addr); -	msg_set_destnode(msg, *dnode); +	if (dnode != onode) +		msg_set_prevnode(msg, onode); +	msg_set_destnode(msg, dnode);  	msg_set_destport(msg, dport);  	*err = TIPC_OK;  	return true; diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 19c45fb66238..a82c5848d4bc 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -38,6 +38,7 @@  #define _TIPC_MSG_H  #include <linux/tipc.h> +#include "core.h"  /*   * Constants and routines used to read and write TIPC payload message headers @@ -109,7 +110,6 @@ struct tipc_skb_cb {  	struct sk_buff *tail;  	bool validated;  	bool wakeup_pending; -	bool bundling;  	u16 chain_sz;  	u16 chain_imp;  }; @@ -558,15 +558,6 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n)  	msg_set_bits(m, 1, 15, 0x1fff, n);  } -static inline bool msg_dup(struct tipc_msg *m) -{ -	if (likely(msg_user(m) != TUNNEL_PROTOCOL)) -		return false; -	if (msg_type(m) != SYNCH_MSG) -		return false; -	return true; -} -  /*   * Word 2   */ @@ -620,12 +611,12 @@ static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)  } -static inline u32 msg_next_sent(struct tipc_msg *m) +static inline u16 msg_next_sent(struct tipc_msg *m)  {  	return msg_bits(m, 4, 0, 0xffff);  } -static inline void msg_set_next_sent(struct tipc_msg *m, u32 n) +static inline void msg_set_next_sent(struct tipc_msg *m, u16 n)  {  	msg_set_bits(m, 4, 0, 0xffff, n);  } @@ -658,12 +649,12 @@ static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)  /*   * Word 5   */ -static inline u32 msg_session(struct tipc_msg *m) +static inline u16 msg_session(struct tipc_msg *m)  {  	return msg_bits(m, 5, 16, 0xffff);  } -static inline void msg_set_session(struct tipc_msg *m, u32 n) +static inline void msg_set_session(struct tipc_msg *m, u16 n)  {  	msg_set_bits(m, 5, 16, 0xffff, n);  } @@ -726,12 +717,12 @@ static inline char *msg_media_addr(struct tipc_msg *m)  /*   * Word 9   */ -static inline u32 msg_msgcnt(struct tipc_msg *m) +static inline u16 msg_msgcnt(struct tipc_msg *m)  {  	return msg_bits(m, 9, 16, 0xffff);  } -static inline void msg_set_msgcnt(struct tipc_msg *m, u32 n) +static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)  {  	msg_set_bits(m, 9, 16, 0xffff, n);  } @@ -766,10 +757,25 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)  	msg_set_bits(m, 9, 0, 0xffff, n);  } +static inline bool msg_peer_link_is_up(struct tipc_msg *m) +{ +	if (likely(msg_user(m) != LINK_PROTOCOL)) +		return true; +	if (msg_type(m) == STATE_MSG) +		return true; +	return false; +} + +static inline bool msg_peer_node_is_up(struct tipc_msg *m) +{ +	if (msg_peer_link_is_up(m)) +		return true; +	return msg_redundant_link(m); +} +  struct sk_buff *tipc_buf_acquire(u32 size);  bool tipc_msg_validate(struct sk_buff *skb); -bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, -		      int err); +bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err);  void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,  		   u32 hsize, u32 destnode);  struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, @@ -782,8 +788,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,  bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);  int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,  		   int offset, int dsz, int mtu, struct sk_buff_head *list); -bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode, -			  int *err); +bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);  struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);  static inline u16 buf_seqno(struct sk_buff *skb) @@ -857,26 +862,65 @@ static inline struct sk_buff *tipc_skb_dequeue(struct sk_buff_head *list,  	return skb;  } -/* tipc_skb_queue_tail(): add buffer to tail of list; +/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number   * @list: list to be appended to - * @skb: buffer to append. Always appended - * @dport: the destination port of the buffer - * returns true if dport differs from previous destination + * @skb: buffer to add + * Returns true if queue should treated further, otherwise false   */ -static inline bool tipc_skb_queue_tail(struct sk_buff_head *list, -				       struct sk_buff *skb, u32 dport) +static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list, +					   struct sk_buff *skb)  { -	struct sk_buff *_skb = NULL; -	bool rv = false; +	struct sk_buff *_skb, *tmp; +	struct tipc_msg *hdr = buf_msg(skb); +	u16 seqno = msg_seqno(hdr); -	spin_lock_bh(&list->lock); -	_skb = skb_peek_tail(list); -	if (!_skb || (msg_destport(buf_msg(_skb)) != dport) || -	    (skb_queue_len(list) > 32)) -		rv = true; +	if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) { +		__skb_queue_head(list, skb); +		return true; +	} +	if (likely(less(seqno, buf_seqno(skb_peek(list))))) { +		__skb_queue_head(list, skb); +		return true; +	} +	if (!more(seqno, buf_seqno(skb_peek_tail(list)))) { +		skb_queue_walk_safe(list, _skb, tmp) { +			if (likely(less(seqno, buf_seqno(_skb)))) { +				__skb_queue_before(list, _skb, skb); +				return true; +			} +		} +	}  	__skb_queue_tail(list, skb); +	return false; +} + +/* tipc_skb_queue_splice_tail - append an skb list to lock protected list + * @list: the new list to append. Not lock protected + * @head: target list. Lock protected. + */ +static inline void tipc_skb_queue_splice_tail(struct sk_buff_head *list, +					      struct sk_buff_head *head) +{ +	spin_lock_bh(&head->lock); +	skb_queue_splice_tail(list, head); +	spin_unlock_bh(&head->lock); +} + +/* tipc_skb_queue_splice_tail_init - merge two lock protected skb lists + * @list: the new list to add. Lock protected. Will be reinitialized + * @head: target list. Lock protected. + */ +static inline void tipc_skb_queue_splice_tail_init(struct sk_buff_head *list, +						   struct sk_buff_head *head) +{ +	struct sk_buff_head tmp; + +	__skb_queue_head_init(&tmp); + +	spin_lock_bh(&list->lock); +	skb_queue_splice_tail_init(list, &tmp);  	spin_unlock_bh(&list->lock); -	return rv; +	tipc_skb_queue_splice_tail(&tmp, head);  }  #endif diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 41e7b7e4dda0..e6018b7eb197 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -96,13 +96,13 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)  		dnode = node->addr;  		if (in_own_node(net, dnode))  			continue; -		if (!tipc_node_active_links(node)) +		if (!tipc_node_is_up(node))  			continue;  		oskb = pskb_copy(skb, GFP_ATOMIC);  		if (!oskb)  			break;  		msg_set_destnode(buf_msg(oskb), dnode); -		tipc_link_xmit_skb(net, oskb, dnode, dnode); +		tipc_node_xmit_skb(net, oskb, dnode, dnode);  	}  	rcu_read_unlock(); @@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)  			 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);  	rcu_read_unlock(); -	tipc_link_xmit(net, &head, dnode, dnode); +	tipc_node_xmit(net, &head, dnode, dnode);  }  static void tipc_publ_subscribe(struct net *net, struct publication *publ, diff --git a/net/tipc/netlink_compat.c b/net/tipc/netlink_compat.c index 53e0fee80086..1eadc95e1132 100644 --- a/net/tipc/netlink_compat.c +++ b/net/tipc/netlink_compat.c @@ -1114,7 +1114,7 @@ static int tipc_nl_compat_recv(struct sk_buff *skb, struct genl_info *info)  	}  	len = nlmsg_attrlen(req_nlh, GENL_HDRLEN + TIPC_GENL_HDRLEN); -	if (TLV_GET_LEN(msg.req) && !TLV_OK(msg.req, len)) { +	if (len && !TLV_OK(msg.req, len)) {  		msg.rep = tipc_get_err_tlv(TIPC_CFG_NOT_SUPPORTED);  		err = -EOPNOTSUPP;  		goto send; diff --git a/net/tipc/node.c b/net/tipc/node.c index 0b1d61a5f853..703875fd6cde 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -40,10 +40,42 @@  #include "name_distr.h"  #include "socket.h"  #include "bcast.h" +#include "discover.h" -static void node_lost_contact(struct tipc_node *n_ptr); +/* Node FSM states and events: + */ +enum { +	SELF_DOWN_PEER_DOWN    = 0xdd, +	SELF_UP_PEER_UP        = 0xaa, +	SELF_DOWN_PEER_LEAVING = 0xd1, +	SELF_UP_PEER_COMING    = 0xac, +	SELF_COMING_PEER_UP    = 0xca, +	SELF_LEAVING_PEER_DOWN = 0x1d, +	NODE_FAILINGOVER       = 0xf0, +	NODE_SYNCHING          = 0xcc +}; + +enum { +	SELF_ESTABL_CONTACT_EVT = 0xece, +	SELF_LOST_CONTACT_EVT   = 0x1ce, +	PEER_ESTABL_CONTACT_EVT = 0x9ece, +	PEER_LOST_CONTACT_EVT   = 0x91ce, +	NODE_FAILOVER_BEGIN_EVT = 0xfbe, +	NODE_FAILOVER_END_EVT   = 0xfee, +	NODE_SYNCH_BEGIN_EVT    = 0xcbe, +	NODE_SYNCH_END_EVT      = 0xcee +}; + +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, +				  struct sk_buff_head *xmitq, +				  struct tipc_media_addr **maddr); +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, +				bool delete); +static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);  static void node_established_contact(struct tipc_node *n_ptr);  static void tipc_node_delete(struct tipc_node *node); +static void tipc_node_timeout(unsigned long data); +static void tipc_node_fsm_evt(struct tipc_node *n, int evt);  struct tipc_sock_conn {  	u32 port; @@ -110,7 +142,7 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)  	return NULL;  } -struct tipc_node *tipc_node_create(struct net *net, u32 addr) +struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)  {  	struct tipc_net *tn = net_generic(net, tipc_net_id);  	struct tipc_node *n_ptr, *temp_node; @@ -126,12 +158,14 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)  	}  	n_ptr->addr = addr;  	n_ptr->net = net; +	n_ptr->capabilities = capabilities;  	kref_init(&n_ptr->kref);  	spin_lock_init(&n_ptr->lock);  	INIT_HLIST_NODE(&n_ptr->hash);  	INIT_LIST_HEAD(&n_ptr->list);  	INIT_LIST_HEAD(&n_ptr->publ_list);  	INIT_LIST_HEAD(&n_ptr->conn_sks); +	skb_queue_head_init(&n_ptr->bclink.namedq);  	__skb_queue_head_init(&n_ptr->bclink.deferdq);  	hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);  	list_for_each_entry_rcu(temp_node, &tn->node_list, list) { @@ -139,14 +173,32 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)  			break;  	}  	list_add_tail_rcu(&n_ptr->list, &temp_node->list); -	n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN; +	n_ptr->state = SELF_DOWN_PEER_LEAVING;  	n_ptr->signature = INVALID_NODE_SIG; +	n_ptr->active_links[0] = INVALID_BEARER_ID; +	n_ptr->active_links[1] = INVALID_BEARER_ID;  	tipc_node_get(n_ptr); +	setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); +	n_ptr->keepalive_intv = U32_MAX;  exit:  	spin_unlock_bh(&tn->node_list_lock);  	return n_ptr;  } +static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l) +{ +	unsigned long tol = l->tolerance; +	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; +	unsigned long keepalive_intv = msecs_to_jiffies(intv); + +	/* Link with lowest tolerance determines timer interval */ +	if (keepalive_intv < n->keepalive_intv) +		n->keepalive_intv = keepalive_intv; + +	/* Ensure link's abort limit corresponds to current interval */ +	l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv); +} +  static void tipc_node_delete(struct tipc_node *node)  {  	list_del_rcu(&node->list); @@ -160,8 +212,11 @@ void tipc_node_stop(struct net *net)  	struct tipc_node *node, *t_node;  	spin_lock_bh(&tn->node_list_lock); -	list_for_each_entry_safe(node, t_node, &tn->node_list, list) +	list_for_each_entry_safe(node, t_node, &tn->node_list, list) { +		if (del_timer(&node->timer)) +			tipc_node_put(node);  		tipc_node_put(node); +	}  	spin_unlock_bh(&tn->node_list_lock);  } @@ -219,158 +274,551 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)  	tipc_node_put(node);  } +/* tipc_node_timeout - handle expiration of node timer + */ +static void tipc_node_timeout(unsigned long data) +{ +	struct tipc_node *n = (struct tipc_node *)data; +	struct tipc_link_entry *le; +	struct sk_buff_head xmitq; +	int bearer_id; +	int rc = 0; + +	__skb_queue_head_init(&xmitq); + +	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { +		tipc_node_lock(n); +		le = &n->links[bearer_id]; +		if (le->link) { +			/* Link tolerance may change asynchronously: */ +			tipc_node_calculate_timer(n, le->link); +			rc = tipc_link_timeout(le->link, &xmitq); +		} +		tipc_node_unlock(n); +		tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); +		if (rc & TIPC_LINK_DOWN_EVT) +			tipc_node_link_down(n, bearer_id, false); +	} +	if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) +		tipc_node_get(n); +	tipc_node_put(n); +} +  /** - * tipc_node_link_up - handle addition of link - * + * __tipc_node_link_up - handle addition of link + * Node lock must be held by caller   * Link becomes active (alone or shared) or standby, depending on its priority.   */ -void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, +				struct sk_buff_head *xmitq)  { -	struct tipc_link **active = &n_ptr->active_links[0]; +	int *slot0 = &n->active_links[0]; +	int *slot1 = &n->active_links[1]; +	struct tipc_link *ol = node_active_link(n, 0); +	struct tipc_link *nl = n->links[bearer_id].link; -	n_ptr->working_links++; -	n_ptr->action_flags |= TIPC_NOTIFY_LINK_UP; -	n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id; +	if (!nl || !tipc_link_is_up(nl)) +		return; -	pr_debug("Established link <%s> on network plane %c\n", -		 l_ptr->name, l_ptr->net_plane); +	n->working_links++; +	n->action_flags |= TIPC_NOTIFY_LINK_UP; +	n->link_id = nl->peer_bearer_id << 16 | bearer_id; -	if (!active[0]) { -		active[0] = active[1] = l_ptr; -		node_established_contact(n_ptr); -		goto exit; -	} -	if (l_ptr->priority < active[0]->priority) { -		pr_debug("New link <%s> becomes standby\n", l_ptr->name); -		goto exit; +	/* Leave room for tunnel header when returning 'mtu' to users: */ +	n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; + +	tipc_bearer_add_dest(n->net, bearer_id, n->addr); + +	pr_debug("Established link <%s> on network plane %c\n", +		 nl->name, nl->net_plane); + +	/* First link? => give it both slots */ +	if (!ol) { +		*slot0 = bearer_id; +		*slot1 = bearer_id; +		tipc_link_build_bcast_sync_msg(nl, xmitq); +		node_established_contact(n); +		return;  	} -	tipc_link_dup_queue_xmit(active[0], l_ptr); -	if (l_ptr->priority == active[0]->priority) { -		active[0] = l_ptr; -		goto exit; + +	/* Second link => redistribute slots */ +	if (nl->priority > ol->priority) { +		pr_debug("Old link <%s> becomes standby\n", ol->name); +		*slot0 = bearer_id; +		*slot1 = bearer_id; +	} else if (nl->priority == ol->priority) { +		*slot0 = bearer_id; +	} else { +		pr_debug("New link <%s> is standby\n", nl->name);  	} -	pr_debug("Old link <%s> becomes standby\n", active[0]->name); -	if (active[1] != active[0]) -		pr_debug("Old link <%s> becomes standby\n", active[1]->name); -	active[0] = active[1] = l_ptr; -exit: -	/* Leave room for changeover header when returning 'mtu' to users: */ -	n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; -	n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE; + +	/* Prepare synchronization with first link */ +	tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);  }  /** - * node_select_active_links - select active link + * tipc_node_link_up - handle addition of link + * + * Link becomes active (alone or shared) or standby, depending on its priority.   */ -static void node_select_active_links(struct tipc_node *n_ptr) +static void tipc_node_link_up(struct tipc_node *n, int bearer_id, +			      struct sk_buff_head *xmitq)  { -	struct tipc_link **active = &n_ptr->active_links[0]; -	u32 i; -	u32 highest_prio = 0; +	tipc_node_lock(n); +	__tipc_node_link_up(n, bearer_id, xmitq); +	tipc_node_unlock(n); +} -	active[0] = active[1] = NULL; +/** + * __tipc_node_link_down - handle loss of link + */ +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, +				  struct sk_buff_head *xmitq, +				  struct tipc_media_addr **maddr) +{ +	struct tipc_link_entry *le = &n->links[*bearer_id]; +	int *slot0 = &n->active_links[0]; +	int *slot1 = &n->active_links[1]; +	int i, highest = 0; +	struct tipc_link *l, *_l, *tnl; + +	l = n->links[*bearer_id].link; +	if (!l || tipc_link_is_reset(l)) +		return; -	for (i = 0; i < MAX_BEARERS; i++) { -		struct tipc_link *l_ptr = n_ptr->links[i]; +	n->working_links--; +	n->action_flags |= TIPC_NOTIFY_LINK_DOWN; +	n->link_id = l->peer_bearer_id << 16 | *bearer_id; -		if (!l_ptr || !tipc_link_is_up(l_ptr) || -		    (l_ptr->priority < highest_prio)) -			continue; +	tipc_bearer_remove_dest(n->net, *bearer_id, n->addr); -		if (l_ptr->priority > highest_prio) { -			highest_prio = l_ptr->priority; -			active[0] = active[1] = l_ptr; -		} else { -			active[1] = l_ptr; +	pr_debug("Lost link <%s> on network plane %c\n", +		 l->name, l->net_plane); + +	/* Select new active link if any available */ +	*slot0 = INVALID_BEARER_ID; +	*slot1 = INVALID_BEARER_ID; +	for (i = 0; i < MAX_BEARERS; i++) { +		_l = n->links[i].link; +		if (!_l || !tipc_link_is_up(_l)) +			continue; +		if (_l == l) +			continue; +		if (_l->priority < highest) +			continue; +		if (_l->priority > highest) { +			highest = _l->priority; +			*slot0 = i; +			*slot1 = i; +			continue;  		} +		*slot1 = i; +	} + +	if (!tipc_node_is_up(n)) { +		tipc_link_reset(l); +		node_lost_contact(n, &le->inputq); +		return;  	} + +	/* There is still a working link => initiate failover */ +	tnl = node_active_link(n, 0); +	tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT); +	tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); +	n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); +	tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); +	tipc_link_reset(l); +	tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); +	tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); +	*maddr = &n->links[tnl->bearer_id].maddr; +	*bearer_id = tnl->bearer_id;  } -/** - * tipc_node_link_down - handle loss of link - */ -void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)  { -	struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); -	struct tipc_link **active; +	struct tipc_link_entry *le = &n->links[bearer_id]; +	struct tipc_media_addr *maddr; +	struct sk_buff_head xmitq; + +	__skb_queue_head_init(&xmitq); + +	tipc_node_lock(n); +	__tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); +	if (delete && le->link) { +		kfree(le->link); +		le->link = NULL; +		n->link_cnt--; +	} +	tipc_node_unlock(n); -	n_ptr->working_links--; -	n_ptr->action_flags |= TIPC_NOTIFY_LINK_DOWN; -	n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id; +	tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); +	tipc_sk_rcv(n->net, &le->inputq); +} -	if (!tipc_link_is_active(l_ptr)) { -		pr_debug("Lost standby link <%s> on network plane %c\n", -			 l_ptr->name, l_ptr->net_plane); -		return; -	} -	pr_debug("Lost link <%s> on network plane %c\n", -		 l_ptr->name, l_ptr->net_plane); - -	active = &n_ptr->active_links[0]; -	if (active[0] == l_ptr) -		active[0] = active[1]; -	if (active[1] == l_ptr) -		active[1] = active[0]; -	if (active[0] == l_ptr) -		node_select_active_links(n_ptr); -	if (tipc_node_is_up(n_ptr)) -		tipc_link_failover_send_queue(l_ptr); -	else -		node_lost_contact(n_ptr); - -	/* Leave room for changeover header when returning 'mtu' to users: */ -	if (active[0]) { -		n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE; -		n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE; +bool tipc_node_is_up(struct tipc_node *n) +{ +	return n->active_links[0] != INVALID_BEARER_ID; +} + +void tipc_node_check_dest(struct net *net, u32 onode, +			  struct tipc_bearer *b, +			  u16 capabilities, u32 signature, +			  struct tipc_media_addr *maddr, +			  bool *respond, bool *dupl_addr) +{ +	struct tipc_node *n; +	struct tipc_link *l; +	struct tipc_link_entry *le; +	bool addr_match = false; +	bool sign_match = false; +	bool link_up = false; +	bool accept_addr = false; +	bool reset = true; + +	*dupl_addr = false; +	*respond = false; + +	n = tipc_node_create(net, onode, capabilities); +	if (!n)  		return; + +	tipc_node_lock(n); + +	le = &n->links[b->identity]; + +	/* Prepare to validate requesting node's signature and media address */ +	l = le->link; +	link_up = l && tipc_link_is_up(l); +	addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr)); +	sign_match = (signature == n->signature); + +	/* These three flags give us eight permutations: */ + +	if (sign_match && addr_match && link_up) { +		/* All is fine. Do nothing. */ +		reset = false; +	} else if (sign_match && addr_match && !link_up) { +		/* Respond. The link will come up in due time */ +		*respond = true; +	} else if (sign_match && !addr_match && link_up) { +		/* Peer has changed i/f address without rebooting. +		 * If so, the link will reset soon, and the next +		 * discovery will be accepted. So we can ignore it. +		 * It may also be an cloned or malicious peer having +		 * chosen the same node address and signature as an +		 * existing one. +		 * Ignore requests until the link goes down, if ever. +		 */ +		*dupl_addr = true; +	} else if (sign_match && !addr_match && !link_up) { +		/* Peer link has changed i/f address without rebooting. +		 * It may also be a cloned or malicious peer; we can't +		 * distinguish between the two. +		 * The signature is correct, so we must accept. +		 */ +		accept_addr = true; +		*respond = true; +	} else if (!sign_match && addr_match && link_up) { +		/* Peer node rebooted. Two possibilities: +		 *  - Delayed re-discovery; this link endpoint has already +		 *    reset and re-established contact with the peer, before +		 *    receiving a discovery message from that node. +		 *    (The peer happened to receive one from this node first). +		 *  - The peer came back so fast that our side has not +		 *    discovered it yet. Probing from this side will soon +		 *    reset the link, since there can be no working link +		 *    endpoint at the peer end, and the link will re-establish. +		 *  Accept the signature, since it comes from a known peer. +		 */ +		n->signature = signature; +	} else if (!sign_match && addr_match && !link_up) { +		/*  The peer node has rebooted. +		 *  Accept signature, since it is a known peer. +		 */ +		n->signature = signature; +		*respond = true; +	} else if (!sign_match && !addr_match && link_up) { +		/* Peer rebooted with new address, or a new/duplicate peer. +		 * Ignore until the link goes down, if ever. +		 */ +		*dupl_addr = true; +	} else if (!sign_match && !addr_match && !link_up) { +		/* Peer rebooted with new address, or it is a new peer. +		 * Accept signature and address. +		 */ +		n->signature = signature; +		accept_addr = true; +		*respond = true;  	} -	/* Loopback link went down? No fragmentation needed from now on. */ -	if (n_ptr->addr == tn->own_addr) { -		n_ptr->act_mtus[0] = MAX_MSG_SIZE; -		n_ptr->act_mtus[1] = MAX_MSG_SIZE; + +	if (!accept_addr) +		goto exit; + +	/* Now create new link if not already existing */ +	if (!l) { +		if (n->link_cnt == 2) { +			pr_warn("Cannot establish 3rd link to %x\n", n->addr); +			goto exit; +		} +		if (!tipc_link_create(n, b, mod(tipc_net(net)->random), +				      tipc_own_addr(net), onode, &le->maddr, +				      &le->inputq, &n->bclink.namedq, &l)) { +			*respond = false; +			goto exit; +		} +		tipc_link_reset(l); +		if (n->state == NODE_FAILINGOVER) +			tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); +		le->link = l; +		n->link_cnt++; +		tipc_node_calculate_timer(n, l); +		if (n->link_cnt == 1) +			if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) +				tipc_node_get(n);  	} +	memcpy(&le->maddr, maddr, sizeof(*maddr)); +exit: +	tipc_node_unlock(n); +	if (reset) +		tipc_node_link_down(n, b->identity, false); +	tipc_node_put(n);  } -int tipc_node_active_links(struct tipc_node *n_ptr) +void tipc_node_delete_links(struct net *net, int bearer_id)  { -	return n_ptr->active_links[0] != NULL; +	struct tipc_net *tn = net_generic(net, tipc_net_id); +	struct tipc_node *n; + +	rcu_read_lock(); +	list_for_each_entry_rcu(n, &tn->node_list, list) { +		tipc_node_link_down(n, bearer_id, true); +	} +	rcu_read_unlock();  } -int tipc_node_is_up(struct tipc_node *n_ptr) +static void tipc_node_reset_links(struct tipc_node *n)  { -	return tipc_node_active_links(n_ptr); +	char addr_string[16]; +	int i; + +	pr_warn("Resetting all links to %s\n", +		tipc_addr_string_fill(addr_string, n->addr)); + +	for (i = 0; i < MAX_BEARERS; i++) { +		tipc_node_link_down(n, i, false); +	}  } -void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +/* tipc_node_fsm_evt - node finite state machine + * Determines when contact is allowed with peer node + */ +static void tipc_node_fsm_evt(struct tipc_node *n, int evt)  { -	n_ptr->links[l_ptr->bearer_id] = l_ptr; -	n_ptr->link_cnt++; +	int state = n->state; + +	switch (state) { +	case SELF_DOWN_PEER_DOWN: +		switch (evt) { +		case SELF_ESTABL_CONTACT_EVT: +			state = SELF_UP_PEER_COMING; +			break; +		case PEER_ESTABL_CONTACT_EVT: +			state = SELF_COMING_PEER_UP; +			break; +		case SELF_LOST_CONTACT_EVT: +		case PEER_LOST_CONTACT_EVT: +			break; +		case NODE_SYNCH_END_EVT: +		case NODE_SYNCH_BEGIN_EVT: +		case NODE_FAILOVER_BEGIN_EVT: +		case NODE_FAILOVER_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case SELF_UP_PEER_UP: +		switch (evt) { +		case SELF_LOST_CONTACT_EVT: +			state = SELF_DOWN_PEER_LEAVING; +			break; +		case PEER_LOST_CONTACT_EVT: +			state = SELF_LEAVING_PEER_DOWN; +			break; +		case NODE_SYNCH_BEGIN_EVT: +			state = NODE_SYNCHING; +			break; +		case NODE_FAILOVER_BEGIN_EVT: +			state = NODE_FAILINGOVER; +			break; +		case SELF_ESTABL_CONTACT_EVT: +		case PEER_ESTABL_CONTACT_EVT: +		case NODE_SYNCH_END_EVT: +		case NODE_FAILOVER_END_EVT: +			break; +		default: +			goto illegal_evt; +		} +		break; +	case SELF_DOWN_PEER_LEAVING: +		switch (evt) { +		case PEER_LOST_CONTACT_EVT: +			state = SELF_DOWN_PEER_DOWN; +			break; +		case SELF_ESTABL_CONTACT_EVT: +		case PEER_ESTABL_CONTACT_EVT: +		case SELF_LOST_CONTACT_EVT: +			break; +		case NODE_SYNCH_END_EVT: +		case NODE_SYNCH_BEGIN_EVT: +		case NODE_FAILOVER_BEGIN_EVT: +		case NODE_FAILOVER_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case SELF_UP_PEER_COMING: +		switch (evt) { +		case PEER_ESTABL_CONTACT_EVT: +			state = SELF_UP_PEER_UP; +			break; +		case SELF_LOST_CONTACT_EVT: +			state = SELF_DOWN_PEER_LEAVING; +			break; +		case SELF_ESTABL_CONTACT_EVT: +		case PEER_LOST_CONTACT_EVT: +			break; +		case NODE_SYNCH_END_EVT: +		case NODE_SYNCH_BEGIN_EVT: +		case NODE_FAILOVER_BEGIN_EVT: +		case NODE_FAILOVER_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case SELF_COMING_PEER_UP: +		switch (evt) { +		case SELF_ESTABL_CONTACT_EVT: +			state = SELF_UP_PEER_UP; +			break; +		case PEER_LOST_CONTACT_EVT: +			state = SELF_LEAVING_PEER_DOWN; +			break; +		case SELF_LOST_CONTACT_EVT: +		case PEER_ESTABL_CONTACT_EVT: +			break; +		case NODE_SYNCH_END_EVT: +		case NODE_SYNCH_BEGIN_EVT: +		case NODE_FAILOVER_BEGIN_EVT: +		case NODE_FAILOVER_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case SELF_LEAVING_PEER_DOWN: +		switch (evt) { +		case SELF_LOST_CONTACT_EVT: +			state = SELF_DOWN_PEER_DOWN; +			break; +		case SELF_ESTABL_CONTACT_EVT: +		case PEER_ESTABL_CONTACT_EVT: +		case PEER_LOST_CONTACT_EVT: +			break; +		case NODE_SYNCH_END_EVT: +		case NODE_SYNCH_BEGIN_EVT: +		case NODE_FAILOVER_BEGIN_EVT: +		case NODE_FAILOVER_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case NODE_FAILINGOVER: +		switch (evt) { +		case SELF_LOST_CONTACT_EVT: +			state = SELF_DOWN_PEER_LEAVING; +			break; +		case PEER_LOST_CONTACT_EVT: +			state = SELF_LEAVING_PEER_DOWN; +			break; +		case NODE_FAILOVER_END_EVT: +			state = SELF_UP_PEER_UP; +			break; +		case NODE_FAILOVER_BEGIN_EVT: +		case SELF_ESTABL_CONTACT_EVT: +		case PEER_ESTABL_CONTACT_EVT: +			break; +		case NODE_SYNCH_BEGIN_EVT: +		case NODE_SYNCH_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	case NODE_SYNCHING: +		switch (evt) { +		case SELF_LOST_CONTACT_EVT: +			state = SELF_DOWN_PEER_LEAVING; +			break; +		case PEER_LOST_CONTACT_EVT: +			state = SELF_LEAVING_PEER_DOWN; +			break; +		case NODE_SYNCH_END_EVT: +			state = SELF_UP_PEER_UP; +			break; +		case NODE_FAILOVER_BEGIN_EVT: +			state = NODE_FAILINGOVER; +			break; +		case NODE_SYNCH_BEGIN_EVT: +		case SELF_ESTABL_CONTACT_EVT: +		case PEER_ESTABL_CONTACT_EVT: +			break; +		case NODE_FAILOVER_END_EVT: +		default: +			goto illegal_evt; +		} +		break; +	default: +		pr_err("Unknown node fsm state %x\n", state); +		break; +	} +	n->state = state; +	return; + +illegal_evt: +	pr_err("Illegal node fsm evt %x in state %x\n", evt, state);  } -void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) +bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)  { -	int i; +	int state = n->state; -	for (i = 0; i < MAX_BEARERS; i++) { -		if (l_ptr != n_ptr->links[i]) -			continue; -		n_ptr->links[i] = NULL; -		n_ptr->link_cnt--; +	if (likely(state == SELF_UP_PEER_UP)) +		return true; + +	if (state == SELF_LEAVING_PEER_DOWN) +		return false; + +	if (state == SELF_DOWN_PEER_LEAVING) { +		if (msg_peer_node_is_up(hdr)) +			return false;  	} + +	return true;  }  static void node_established_contact(struct tipc_node *n_ptr)  { +	tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);  	n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;  	n_ptr->bclink.oos_state = 0;  	n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);  	tipc_bclink_add_node(n_ptr->net, n_ptr->addr);  } -static void node_lost_contact(struct tipc_node *n_ptr) +static void node_lost_contact(struct tipc_node *n_ptr, +			      struct sk_buff_head *inputq)  {  	char addr_string[16];  	struct tipc_sock_conn *conn, *safe; +	struct tipc_link *l;  	struct list_head *conns = &n_ptr->conn_sks;  	struct sk_buff *skb;  	struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); @@ -396,21 +844,13 @@ static void node_lost_contact(struct tipc_node *n_ptr)  	/* Abort any ongoing link failover */  	for (i = 0; i < MAX_BEARERS; i++) { -		struct tipc_link *l_ptr = n_ptr->links[i]; -		if (!l_ptr) -			continue; -		l_ptr->flags &= ~LINK_FAILINGOVER; -		l_ptr->failover_checkpt = 0; -		l_ptr->failover_pkts = 0; -		kfree_skb(l_ptr->failover_skb); -		l_ptr->failover_skb = NULL; -		tipc_link_reset_fragments(l_ptr); +		l = n_ptr->links[i].link; +		if (l) +			tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);  	} -	n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN; -  	/* Prevent re-contact with node until cleanup is done */ -	n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN; +	tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);  	/* Notify publications from this node */  	n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; @@ -421,10 +861,8 @@ static void node_lost_contact(struct tipc_node *n_ptr)  				      SHORT_H_SIZE, 0, tn->own_addr,  				      conn->peer_node, conn->port,  				      conn->peer_port, TIPC_ERR_NO_NODE); -		if (likely(skb)) { -			skb_queue_tail(n_ptr->inputq, skb); -			n_ptr->action_flags |= TIPC_MSG_EVT; -		} +		if (likely(skb)) +			skb_queue_tail(inputq, skb);  		list_del(&conn->list);  		kfree(conn);  	} @@ -453,7 +891,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,  		goto exit;  	tipc_node_lock(node); -	link = node->links[bearer_id]; +	link = node->links[bearer_id].link;  	if (link) {  		strncpy(linkname, link->name, len);  		err = 0; @@ -471,27 +909,20 @@ void tipc_node_unlock(struct tipc_node *node)  	u32 flags = node->action_flags;  	u32 link_id = 0;  	struct list_head *publ_list; -	struct sk_buff_head *inputq = node->inputq; -	struct sk_buff_head *namedq; -	if (likely(!flags || (flags == TIPC_MSG_EVT))) { -		node->action_flags = 0; +	if (likely(!flags)) {  		spin_unlock_bh(&node->lock); -		if (flags == TIPC_MSG_EVT) -			tipc_sk_rcv(net, inputq);  		return;  	}  	addr = node->addr;  	link_id = node->link_id; -	namedq = node->namedq;  	publ_list = &node->publ_list; -	node->action_flags &= ~(TIPC_MSG_EVT | -				TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | +	node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |  				TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |  				TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT | -				TIPC_NAMED_MSG_EVT | TIPC_BCAST_RESET); +				TIPC_BCAST_RESET);  	spin_unlock_bh(&node->lock); @@ -512,17 +943,11 @@ void tipc_node_unlock(struct tipc_node *node)  		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,  				      link_id, addr); -	if (flags & TIPC_MSG_EVT) -		tipc_sk_rcv(net, inputq); - -	if (flags & TIPC_NAMED_MSG_EVT) -		tipc_named_rcv(net, namedq); -  	if (flags & TIPC_BCAST_MSG_EVT)  		tipc_bclink_input(net);  	if (flags & TIPC_BCAST_RESET) -		tipc_link_reset_all(node); +		tipc_node_reset_links(node);  }  /* Caller should hold node lock for the passed node */ @@ -559,6 +984,290 @@ msg_full:  	return -EMSGSIZE;  } +static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel, +					       int *bearer_id, +					       struct tipc_media_addr **maddr) +{ +	int id = n->active_links[sel & 1]; + +	if (unlikely(id < 0)) +		return NULL; + +	*bearer_id = id; +	*maddr = &n->links[id].maddr; +	return n->links[id].link; +} + +/** + * tipc_node_xmit() is the general link level function for message sending + * @net: the applicable net namespace + * @list: chain of buffers containing message + * @dnode: address of destination node + * @selector: a number used for deterministic link selection + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + */ +int tipc_node_xmit(struct net *net, struct sk_buff_head *list, +		   u32 dnode, int selector) +{ +	struct tipc_link *l = NULL; +	struct tipc_node *n; +	struct sk_buff_head xmitq; +	struct tipc_media_addr *maddr; +	int bearer_id; +	int rc = -EHOSTUNREACH; + +	__skb_queue_head_init(&xmitq); +	n = tipc_node_find(net, dnode); +	if (likely(n)) { +		tipc_node_lock(n); +		l = tipc_node_select_link(n, selector, &bearer_id, &maddr); +		if (likely(l)) +			rc = tipc_link_xmit(l, list, &xmitq); +		tipc_node_unlock(n); +		if (unlikely(rc == -ENOBUFS)) +			tipc_node_link_down(n, bearer_id, false); +		tipc_node_put(n); +	} +	if (likely(!rc)) { +		tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); +		return 0; +	} +	if (likely(in_own_node(net, dnode))) { +		tipc_sk_rcv(net, list); +		return 0; +	} +	return rc; +} + +/* tipc_node_xmit_skb(): send single buffer to destination + * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE + * messages, which will not be rejected + * The only exception is datagram messages rerouted after secondary + * lookup, which are rare and safe to dispose of anyway. + * TODO: Return real return value, and let callers use + * tipc_wait_for_sendpkt() where applicable + */ +int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, +		       u32 selector) +{ +	struct sk_buff_head head; +	int rc; + +	skb_queue_head_init(&head); +	__skb_queue_tail(&head, skb); +	rc = tipc_node_xmit(net, &head, dnode, selector); +	if (rc == -ELINKCONG) +		kfree_skb(skb); +	return 0; +} + +/** + * tipc_node_check_state - check and if necessary update node state + * @skb: TIPC packet + * @bearer_id: identity of bearer delivering the packet + * Returns true if state is ok, otherwise consumes buffer and returns false + */ +static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, +				  int bearer_id, struct sk_buff_head *xmitq) +{ +	struct tipc_msg *hdr = buf_msg(skb); +	int usr = msg_user(hdr); +	int mtyp = msg_type(hdr); +	u16 oseqno = msg_seqno(hdr); +	u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); +	u16 exp_pkts = msg_msgcnt(hdr); +	u16 rcv_nxt, syncpt, dlv_nxt; +	int state = n->state; +	struct tipc_link *l, *tnl, *pl = NULL; +	struct tipc_media_addr *maddr; +	int i, pb_id; + +	l = n->links[bearer_id].link; +	if (!l) +		return false; +	rcv_nxt = l->rcv_nxt; + + +	if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) +		return true; + +	/* Find parallel link, if any */ +	for (i = 0; i < MAX_BEARERS; i++) { +		if ((i != bearer_id) && n->links[i].link) { +			pl = n->links[i].link; +			break; +		} +	} + +	/* Update node accesibility if applicable */ +	if (state == SELF_UP_PEER_COMING) { +		if (!tipc_link_is_up(l)) +			return true; +		if (!msg_peer_link_is_up(hdr)) +			return true; +		tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT); +	} + +	if (state == SELF_DOWN_PEER_LEAVING) { +		if (msg_peer_node_is_up(hdr)) +			return false; +		tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT); +	} + +	/* Ignore duplicate packets */ +	if (less(oseqno, rcv_nxt)) +		return true; + +	/* Initiate or update failover mode if applicable */ +	if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { +		syncpt = oseqno + exp_pkts - 1; +		if (pl && tipc_link_is_up(pl)) { +			pb_id = pl->bearer_id; +			__tipc_node_link_down(n, &pb_id, xmitq, &maddr); +			tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq); +		} +		/* If pkts arrive out of order, use lowest calculated syncpt */ +		if (less(syncpt, n->sync_point)) +			n->sync_point = syncpt; +	} + +	/* Open parallel link when tunnel link reaches synch point */ +	if ((n->state == NODE_FAILINGOVER) && tipc_link_is_up(l)) { +		if (!more(rcv_nxt, n->sync_point)) +			return true; +		tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT); +		if (pl) +			tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT); +		return true; +	} + +	/* No synching needed if only one link */ +	if (!pl || !tipc_link_is_up(pl)) +		return true; + +	/* Initiate or update synch mode if applicable */ +	if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) { +		syncpt = iseqno + exp_pkts - 1; +		if (!tipc_link_is_up(l)) { +			tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); +			__tipc_node_link_up(n, bearer_id, xmitq); +		} +		if (n->state == SELF_UP_PEER_UP) { +			n->sync_point = syncpt; +			tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT); +			tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT); +		} +		if (less(syncpt, n->sync_point)) +			n->sync_point = syncpt; +	} + +	/* Open tunnel link when parallel link reaches synch point */ +	if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) { +		if (tipc_link_is_synching(l)) { +			tnl = l; +		} else { +			tnl = pl; +			pl = l; +		} +		dlv_nxt = pl->rcv_nxt - mod(skb_queue_len(pl->inputq)); +		if (more(dlv_nxt, n->sync_point)) { +			tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT); +			tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT); +			return true; +		} +		if (l == pl) +			return true; +		if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) +			return true; +		if (usr == LINK_PROTOCOL) +			return true; +		return false; +	} +	return true; +} + +/** + * tipc_rcv - process TIPC packets/messages arriving from off-node + * @net: the applicable net namespace + * @skb: TIPC packet + * @bearer: pointer to bearer message arrived on + * + * Invoked with no locks held. Bearer pointer must point to a valid bearer + * structure (i.e. cannot be NULL), but bearer can be inactive. + */ +void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) +{ +	struct sk_buff_head xmitq; +	struct tipc_node *n; +	struct tipc_msg *hdr = buf_msg(skb); +	int usr = msg_user(hdr); +	int bearer_id = b->identity; +	struct tipc_link_entry *le; +	int rc = 0; + +	__skb_queue_head_init(&xmitq); + +	/* Ensure message is well-formed */ +	if (unlikely(!tipc_msg_validate(skb))) +		goto discard; + +	/* Handle arrival of a non-unicast link packet */ +	if (unlikely(msg_non_seq(hdr))) { +		if (usr ==  LINK_CONFIG) +			tipc_disc_rcv(net, skb, b); +		else +			tipc_bclink_rcv(net, skb); +		return; +	} + +	/* Locate neighboring node that sent packet */ +	n = tipc_node_find(net, msg_prevnode(hdr)); +	if (unlikely(!n)) +		goto discard; +	le = &n->links[bearer_id]; + +	tipc_node_lock(n); + +	/* Is reception permitted at the moment ? */ +	if (!tipc_node_filter_pkt(n, hdr)) +		goto unlock; + +	if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) +		tipc_bclink_sync_state(n, hdr); + +	/* Release acked broadcast packets */ +	if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) +		tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); + +	/* Check and if necessary update node state */ +	if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { +		rc = tipc_link_rcv(le->link, skb, &xmitq); +		skb = NULL; +	} +unlock: +	tipc_node_unlock(n); + +	if (unlikely(rc & TIPC_LINK_UP_EVT)) +		tipc_node_link_up(n, bearer_id, &xmitq); + +	if (unlikely(rc & TIPC_LINK_DOWN_EVT)) +		tipc_node_link_down(n, bearer_id, false); + +	if (unlikely(!skb_queue_empty(&n->bclink.namedq))) +		tipc_named_rcv(net, &n->bclink.namedq); + +	if (!skb_queue_empty(&le->inputq)) +		tipc_sk_rcv(net, &le->inputq); + +	if (!skb_queue_empty(&xmitq)) +		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); + +	tipc_node_put(n); +discard: +	kfree_skb(skb); +} +  int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)  {  	int err; diff --git a/net/tipc/node.h b/net/tipc/node.h index 5a834cf142c8..344b3e7594fd 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -45,23 +45,19 @@  /* Out-of-range value for node signature */  #define INVALID_NODE_SIG	0x10000 +#define INVALID_BEARER_ID -1 +  /* Flags used to take different actions according to flag type - * TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down - * TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down   * TIPC_NOTIFY_NODE_DOWN: notify node is down   * TIPC_NOTIFY_NODE_UP: notify node is up   * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type   */  enum { -	TIPC_MSG_EVT                    = 1, -	TIPC_WAIT_PEER_LINKS_DOWN	= (1 << 1), -	TIPC_WAIT_OWN_LINKS_DOWN	= (1 << 2),  	TIPC_NOTIFY_NODE_DOWN		= (1 << 3),  	TIPC_NOTIFY_NODE_UP		= (1 << 4),  	TIPC_WAKEUP_BCAST_USERS		= (1 << 5),  	TIPC_NOTIFY_LINK_UP		= (1 << 6),  	TIPC_NOTIFY_LINK_DOWN		= (1 << 7), -	TIPC_NAMED_MSG_EVT		= (1 << 8),  	TIPC_BCAST_MSG_EVT		= (1 << 9),  	TIPC_BCAST_RESET		= (1 << 10)  }; @@ -85,10 +81,17 @@ struct tipc_node_bclink {  	u32 deferred_size;  	struct sk_buff_head deferdq;  	struct sk_buff *reasm_buf; -	int inputq_map; +	struct sk_buff_head namedq;  	bool recv_permitted;  }; +struct tipc_link_entry { +	struct tipc_link *link; +	u32 mtu; +	struct sk_buff_head inputq; +	struct tipc_media_addr maddr; +}; +  /**   * struct tipc_node - TIPC node structure   * @addr: network address of node @@ -98,11 +101,12 @@ struct tipc_node_bclink {   * @hash: links to adjacent nodes in unsorted hash chain   * @inputq: pointer to input queue containing messages for msg event   * @namedq: pointer to name table input queue with name table messages - * @curr_link: the link holding the node lock, if any - * @active_links: pointers to active links to node - * @links: pointers to all links to node + * @active_links: bearer ids of active links, used as index into links[] array + * @links: array containing references to all links to node   * @action_flags: bit mask of different types of node actions   * @bclink: broadcast-related info + * @state: connectivity state vs peer node + * @sync_point: sequence number where synch/failover is finished   * @list: links to adjacent nodes in sorted list of cluster's nodes   * @working_links: number of working links to node (both active and standby)   * @link_cnt: number of links to node @@ -118,14 +122,13 @@ struct tipc_node {  	spinlock_t lock;  	struct net *net;  	struct hlist_node hash; -	struct sk_buff_head *inputq; -	struct sk_buff_head *namedq; -	struct tipc_link *active_links[2]; -	u32 act_mtus[2]; -	struct tipc_link *links[MAX_BEARERS]; +	int active_links[2]; +	struct tipc_link_entry links[MAX_BEARERS];  	int action_flags;  	struct tipc_node_bclink bclink;  	struct list_head list; +	int state; +	u16 sync_point;  	int link_cnt;  	u16 working_links;  	u16 capabilities; @@ -133,25 +136,32 @@ struct tipc_node {  	u32 link_id;  	struct list_head publ_list;  	struct list_head conn_sks; +	unsigned long keepalive_intv; +	struct timer_list timer;  	struct rcu_head rcu;  };  struct tipc_node *tipc_node_find(struct net *net, u32 addr);  void tipc_node_put(struct tipc_node *node); -struct tipc_node *tipc_node_create(struct net *net, u32 addr);  void tipc_node_stop(struct net *net); +void tipc_node_check_dest(struct net *net, u32 onode, +			  struct tipc_bearer *bearer, +			  u16 capabilities, u32 signature, +			  struct tipc_media_addr *maddr, +			  bool *respond, bool *dupl_addr); +void tipc_node_delete_links(struct net *net, int bearer_id);  void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);  void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); -void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr); -void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr); -int tipc_node_active_links(struct tipc_node *n_ptr); -int tipc_node_is_up(struct tipc_node *n_ptr); +bool tipc_node_is_up(struct tipc_node *n);  int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,  			   char *linkname, size_t len);  void tipc_node_unlock(struct tipc_node *node); +int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, +		   int selector); +int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, +		       u32 selector);  int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);  void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); -  int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);  static inline void tipc_node_lock(struct tipc_node *node) @@ -159,26 +169,30 @@ static inline void tipc_node_lock(struct tipc_node *node)  	spin_lock_bh(&node->lock);  } -static inline bool tipc_node_blocked(struct tipc_node *node) +static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)  { -	return (node->action_flags & (TIPC_WAIT_PEER_LINKS_DOWN | -		TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN)); +	int bearer_id = n->active_links[sel & 1]; + +	if (unlikely(bearer_id == INVALID_BEARER_ID)) +		return NULL; + +	return n->links[bearer_id].link;  } -static inline uint tipc_node_get_mtu(struct net *net, u32 addr, u32 selector) +static inline unsigned int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)  { -	struct tipc_node *node; -	u32 mtu; - -	node = tipc_node_find(net, addr); +	struct tipc_node *n; +	int bearer_id; +	unsigned int mtu = MAX_MSG_SIZE; -	if (likely(node)) { -		mtu = node->act_mtus[selector & 1]; -		tipc_node_put(node); -	} else { -		mtu = MAX_MSG_SIZE; -	} +	n = tipc_node_find(net, addr); +	if (unlikely(!n)) +		return mtu; +	bearer_id = n->active_links[sel & 1]; +	if (likely(bearer_id != INVALID_BEARER_ID)) +		mtu = n->links[bearer_id].mtu; +	tipc_node_put(n);  	return mtu;  } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 46b6ed534ef2..1060d52ff23e 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -248,6 +248,22 @@ static void tsk_advance_rx_queue(struct sock *sk)  	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));  } +/* tipc_sk_respond() : send response message back to sender + */ +static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err) +{ +	u32 selector; +	u32 dnode; +	u32 onode = tipc_own_addr(sock_net(sk)); + +	if (!tipc_msg_reverse(onode, &skb, err)) +		return; + +	dnode = msg_destnode(buf_msg(skb)); +	selector = msg_origport(buf_msg(skb)); +	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); +} +  /**   * tsk_rej_rx_queue - reject all buffers in socket receive queue   * @@ -256,13 +272,9 @@ static void tsk_advance_rx_queue(struct sock *sk)  static void tsk_rej_rx_queue(struct sock *sk)  {  	struct sk_buff *skb; -	u32 dnode; -	u32 own_node = tsk_own_node(tipc_sk(sk)); -	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { -		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) -			tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0); -	} +	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) +		tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);  }  /* tsk_peer_msg - verify if message was sent by connected port's peer @@ -441,9 +453,7 @@ static int tipc_release(struct socket *sock)  				tsk->connected = 0;  				tipc_node_remove_conn(net, dnode, tsk->portid);  			} -			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -					     TIPC_ERR_NO_PORT)) -				tipc_link_xmit_skb(net, skb, dnode, 0); +			tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);  		}  	} @@ -456,7 +466,7 @@ static int tipc_release(struct socket *sock)  				      tsk_own_node(tsk), tsk_peer_port(tsk),  				      tsk->portid, TIPC_ERR_NO_PORT);  		if (skb) -			tipc_link_xmit_skb(net, skb, dnode, tsk->portid); +			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);  		tipc_node_remove_conn(net, dnode, tsk->portid);  	} @@ -686,21 +696,22 @@ new_mtu:  	do {  		rc = tipc_bclink_xmit(net, pktchain); -		if (likely(rc >= 0)) { -			rc = dsz; -			break; +		if (likely(!rc)) +			return dsz; + +		if (rc == -ELINKCONG) { +			tsk->link_cong = 1; +			rc = tipc_wait_for_sndmsg(sock, &timeo); +			if (!rc) +				continue;  		} +		__skb_queue_purge(pktchain);  		if (rc == -EMSGSIZE) {  			msg->msg_iter = save;  			goto new_mtu;  		} -		if (rc != -ELINKCONG) -			break; -		tipc_sk(sk)->link_cong = 1; -		rc = tipc_wait_for_sndmsg(sock, &timeo); -		if (rc) -			__skb_queue_purge(pktchain); -	} while (!rc); +		break; +	} while (1);  	return rc;  } @@ -763,35 +774,35 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  /**   * tipc_sk_proto_rcv - receive a connection mng protocol message   * @tsk: receiving socket - * @skb: pointer to message buffer. Set to NULL if buffer is consumed. + * @skb: pointer to message buffer.   */ -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb) +static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)  { -	struct tipc_msg *msg = buf_msg(*skb); +	struct sock *sk = &tsk->sk; +	struct tipc_msg *hdr = buf_msg(skb); +	int mtyp = msg_type(hdr);  	int conn_cong; -	u32 dnode; -	u32 own_node = tsk_own_node(tsk); +  	/* Ignore if connection cannot be validated: */ -	if (!tsk_peer_msg(tsk, msg)) +	if (!tsk_peer_msg(tsk, hdr))  		goto exit;  	tsk->probing_state = TIPC_CONN_OK; -	if (msg_type(msg) == CONN_ACK) { +	if (mtyp == CONN_PROBE) { +		msg_set_type(hdr, CONN_PROBE_REPLY); +		tipc_sk_respond(sk, skb, TIPC_OK); +		return; +	} else if (mtyp == CONN_ACK) {  		conn_cong = tsk_conn_cong(tsk); -		tsk->sent_unacked -= msg_msgcnt(msg); +		tsk->sent_unacked -= msg_msgcnt(hdr);  		if (conn_cong) -			tsk->sk.sk_write_space(&tsk->sk); -	} else if (msg_type(msg) == CONN_PROBE) { -		if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) { -			msg_set_type(msg, CONN_PROBE_REPLY); -			return; -		} +			sk->sk_write_space(sk); +	} else if (mtyp != CONN_PROBE_REPLY) { +		pr_warn("Received unknown CONN_PROTO msg\n");  	} -	/* Do nothing if msg_type() == CONN_PROBE_REPLY */  exit: -	kfree_skb(*skb); -	*skb = NULL; +	kfree_skb(skb);  }  static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) @@ -924,24 +935,25 @@ new_mtu:  	do {  		skb = skb_peek(pktchain);  		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; -		rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid); -		if (likely(rc >= 0)) { +		rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid); +		if (likely(!rc)) {  			if (sock->state != SS_READY)  				sock->state = SS_CONNECTING; -			rc = dsz; -			break; +			return dsz;  		} +		if (rc == -ELINKCONG) { +			tsk->link_cong = 1; +			rc = tipc_wait_for_sndmsg(sock, &timeo); +			if (!rc) +				continue; +		} +		__skb_queue_purge(pktchain);  		if (rc == -EMSGSIZE) {  			m->msg_iter = save;  			goto new_mtu;  		} -		if (rc != -ELINKCONG) -			break; -		tsk->link_cong = 1; -		rc = tipc_wait_for_sndmsg(sock, &timeo); -		if (rc) -			__skb_queue_purge(pktchain); -	} while (!rc); +		break; +	} while (1);  	return rc;  } @@ -1043,15 +1055,16 @@ next:  		return rc;  	do {  		if (likely(!tsk_conn_cong(tsk))) { -			rc = tipc_link_xmit(net, pktchain, dnode, portid); +			rc = tipc_node_xmit(net, pktchain, dnode, portid);  			if (likely(!rc)) {  				tsk->sent_unacked++;  				sent += send;  				if (sent == dsz) -					break; +					return dsz;  				goto next;  			}  			if (rc == -EMSGSIZE) { +				__skb_queue_purge(pktchain);  				tsk->max_pkt = tipc_node_get_mtu(net, dnode,  								 portid);  				m->msg_iter = save; @@ -1059,13 +1072,13 @@ next:  			}  			if (rc != -ELINKCONG)  				break; +  			tsk->link_cong = 1;  		}  		rc = tipc_wait_for_sndpkt(sock, &timeo); -		if (rc) -			__skb_queue_purge(pktchain);  	} while (!rc); +	__skb_queue_purge(pktchain);  	return sent ? sent : rc;  } @@ -1221,7 +1234,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)  		return;  	msg = buf_msg(skb);  	msg_set_msgcnt(msg, ack); -	tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg)); +	tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));  }  static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) @@ -1507,82 +1520,81 @@ static void tipc_data_ready(struct sock *sk)   * @tsk: TIPC socket   * @skb: pointer to message buffer. Set to NULL if buffer is consumed   * - * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise + * Returns true if everything ok, false otherwise   */ -static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb) +static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)  {  	struct sock *sk = &tsk->sk;  	struct net *net = sock_net(sk);  	struct socket *sock = sk->sk_socket; -	struct tipc_msg *msg = buf_msg(*skb); -	int retval = -TIPC_ERR_NO_PORT; +	struct tipc_msg *hdr = buf_msg(skb); -	if (msg_mcast(msg)) -		return retval; +	if (unlikely(msg_mcast(hdr))) +		return false;  	switch ((int)sock->state) {  	case SS_CONNECTED: +  		/* Accept only connection-based messages sent by peer */ -		if (tsk_peer_msg(tsk, msg)) { -			if (unlikely(msg_errcode(msg))) { -				sock->state = SS_DISCONNECTING; -				tsk->connected = 0; -				/* let timer expire on it's own */ -				tipc_node_remove_conn(net, tsk_peer_node(tsk), -						      tsk->portid); -			} -			retval = TIPC_OK; +		if (unlikely(!tsk_peer_msg(tsk, hdr))) +			return false; + +		if (unlikely(msg_errcode(hdr))) { +			sock->state = SS_DISCONNECTING; +			tsk->connected = 0; +			/* Let timer expire on it's own */ +			tipc_node_remove_conn(net, tsk_peer_node(tsk), +					      tsk->portid);  		} -		break; +		return true; +  	case SS_CONNECTING: -		/* Accept only ACK or NACK message */ -		if (unlikely(!msg_connected(msg))) -			break; +		/* Accept only ACK or NACK message */ +		if (unlikely(!msg_connected(hdr))) +			return false; -		if (unlikely(msg_errcode(msg))) { +		if (unlikely(msg_errcode(hdr))) {  			sock->state = SS_DISCONNECTING;  			sk->sk_err = ECONNREFUSED; -			retval = TIPC_OK; -			break; +			return true;  		} -		if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) { +		if (unlikely(!msg_isdata(hdr))) {  			sock->state = SS_DISCONNECTING;  			sk->sk_err = EINVAL; -			retval = TIPC_OK; -			break; +			return true;  		} -		tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg)); -		msg_set_importance(&tsk->phdr, msg_importance(msg)); +		tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr)); +		msg_set_importance(&tsk->phdr, msg_importance(hdr));  		sock->state = SS_CONNECTED; -		/* If an incoming message is an 'ACK-', it should be -		 * discarded here because it doesn't contain useful -		 * data. In addition, we should try to wake up -		 * connect() routine if sleeping. -		 */ -		if (msg_data_sz(msg) == 0) { -			kfree_skb(*skb); -			*skb = NULL; -			if (waitqueue_active(sk_sleep(sk))) -				wake_up_interruptible(sk_sleep(sk)); -		} -		retval = TIPC_OK; -		break; +		/* If 'ACK+' message, add to socket receive queue */ +		if (msg_data_sz(hdr)) +			return true; + +		/* If empty 'ACK-' message, wake up sleeping connect() */ +		if (waitqueue_active(sk_sleep(sk))) +			wake_up_interruptible(sk_sleep(sk)); + +		/* 'ACK-' message is neither accepted nor rejected: */ +		msg_set_dest_droppable(hdr, 1); +		return false; +  	case SS_LISTENING:  	case SS_UNCONNECTED: +  		/* Accept only SYN message */ -		if (!msg_connected(msg) && !(msg_errcode(msg))) -			retval = TIPC_OK; +		if (!msg_connected(hdr) && !(msg_errcode(hdr))) +			return true;  		break;  	case SS_DISCONNECTING:  		break;  	default:  		pr_err("Unknown socket state %u\n", sock->state);  	} -	return retval; +	return false;  }  /** @@ -1617,61 +1629,70 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)  /**   * filter_rcv - validate incoming message   * @sk: socket - * @skb: pointer to message. Set to NULL if buffer is consumed. + * @skb: pointer to message.   *   * Enqueues message on receive queue if acceptable; optionally handles   * disconnect indication for a connected socket.   *   * Called with socket lock already taken   * - * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected + * Returns true if message was added to socket receive queue, otherwise false   */ -static int filter_rcv(struct sock *sk, struct sk_buff **skb) +static bool filter_rcv(struct sock *sk, struct sk_buff *skb)  {  	struct socket *sock = sk->sk_socket;  	struct tipc_sock *tsk = tipc_sk(sk); -	struct tipc_msg *msg = buf_msg(*skb); -	unsigned int limit = rcvbuf_limit(sk, *skb); -	int rc = TIPC_OK; +	struct tipc_msg *hdr = buf_msg(skb); +	unsigned int limit = rcvbuf_limit(sk, skb); +	int err = TIPC_OK; +	int usr = msg_user(hdr); -	if (unlikely(msg_user(msg) == CONN_MANAGER)) { +	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {  		tipc_sk_proto_rcv(tsk, skb); -		return TIPC_OK; +		return false;  	} -	if (unlikely(msg_user(msg) == SOCK_WAKEUP)) { -		kfree_skb(*skb); +	if (unlikely(usr == SOCK_WAKEUP)) { +		kfree_skb(skb);  		tsk->link_cong = 0;  		sk->sk_write_space(sk); -		*skb = NULL; -		return TIPC_OK; +		return false;  	} -	/* Reject message if it is wrong sort of message for socket */ -	if (msg_type(msg) > TIPC_DIRECT_MSG) -		return -TIPC_ERR_NO_PORT; +	/* Drop if illegal message type */ +	if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { +		kfree_skb(skb); +		return false; +	} -	if (sock->state == SS_READY) { -		if (msg_connected(msg)) -			return -TIPC_ERR_NO_PORT; -	} else { -		rc = filter_connect(tsk, skb); -		if (rc != TIPC_OK || !*skb) -			return rc; +	/* Reject if wrong message type for current socket state */ +	if (unlikely(sock->state == SS_READY)) { +		if (msg_connected(hdr)) { +			err = TIPC_ERR_NO_PORT; +			goto reject; +		} +	} else if (unlikely(!filter_connect(tsk, skb))) { +		err = TIPC_ERR_NO_PORT; +		goto reject;  	}  	/* Reject message if there isn't room to queue it */ -	if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit) -		return -TIPC_ERR_OVERLOAD; +	if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { +		err = TIPC_ERR_OVERLOAD; +		goto reject; +	}  	/* Enqueue message */ -	TIPC_SKB_CB(*skb)->handle = NULL; -	__skb_queue_tail(&sk->sk_receive_queue, *skb); -	skb_set_owner_r(*skb, sk); +	TIPC_SKB_CB(skb)->handle = NULL; +	__skb_queue_tail(&sk->sk_receive_queue, skb); +	skb_set_owner_r(skb, sk);  	sk->sk_data_ready(sk); -	*skb = NULL; -	return TIPC_OK; +	return true; + +reject: +	tipc_sk_respond(sk, skb, err); +	return false;  }  /** @@ -1685,22 +1706,10 @@ static int filter_rcv(struct sock *sk, struct sk_buff **skb)   */  static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)  { -	int err; -	atomic_t *dcnt; -	u32 dnode; -	struct tipc_sock *tsk = tipc_sk(sk); -	struct net *net = sock_net(sk); -	uint truesize = skb->truesize; +	unsigned int truesize = skb->truesize; -	err = filter_rcv(sk, &skb); -	if (likely(!skb)) { -		dcnt = &tsk->dupl_rcvcnt; -		if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT) -			atomic_add(truesize, dcnt); -		return 0; -	} -	if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) -		tipc_link_xmit_skb(net, skb, dnode, tsk->portid); +	if (likely(filter_rcv(sk, skb))) +		atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);  	return 0;  } @@ -1710,45 +1719,43 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)   * @inputq: list of incoming buffers with potentially different destinations   * @sk: socket where the buffers should be enqueued   * @dport: port number for the socket - * @_skb: returned buffer to be forwarded or rejected, if applicable   *   * Caller must hold socket lock - * - * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD - * or -TIPC_ERR_NO_PORT   */ -static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, -			   u32 dport, struct sk_buff **_skb) +static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, +			    u32 dport)  {  	unsigned int lim;  	atomic_t *dcnt; -	int err;  	struct sk_buff *skb;  	unsigned long time_limit = jiffies + 2;  	while (skb_queue_len(inputq)) {  		if (unlikely(time_after_eq(jiffies, time_limit))) -			return TIPC_OK; +			return; +  		skb = tipc_skb_dequeue(inputq, dport);  		if (unlikely(!skb)) -			return TIPC_OK; +			return; + +		/* Add message directly to receive queue if possible */  		if (!sock_owned_by_user(sk)) { -			err = filter_rcv(sk, &skb); -			if (likely(!skb)) -				continue; -			*_skb = skb; -			return err; +			filter_rcv(sk, skb); +			continue;  		} + +		/* Try backlog, compensating for double-counted bytes */  		dcnt = &tipc_sk(sk)->dupl_rcvcnt;  		if (sk->sk_backlog.len)  			atomic_set(dcnt, 0);  		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);  		if (likely(!sk_add_backlog(sk, skb, lim)))  			continue; -		*_skb = skb; -		return -TIPC_ERR_OVERLOAD; + +		/* Overload => reject message back to sender */ +		tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD); +		break;  	} -	return TIPC_OK;  }  /** @@ -1756,49 +1763,46 @@ static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,   * @inputq: buffer list containing the buffers   * Consumes all buffers in list until inputq is empty   * Note: may be called in multiple threads referring to the same queue - * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH - * Only node local calls check the return value, sending single-buffer queues   */ -int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) +void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)  {  	u32 dnode, dport = 0;  	int err; -	struct sk_buff *skb;  	struct tipc_sock *tsk; -	struct tipc_net *tn;  	struct sock *sk; +	struct sk_buff *skb;  	while (skb_queue_len(inputq)) { -		err = -TIPC_ERR_NO_PORT; -		skb = NULL;  		dport = tipc_skb_peek_port(inputq, dport);  		tsk = tipc_sk_lookup(net, dport); +  		if (likely(tsk)) {  			sk = &tsk->sk;  			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { -				err = tipc_sk_enqueue(inputq, sk, dport, &skb); +				tipc_sk_enqueue(inputq, sk, dport);  				spin_unlock_bh(&sk->sk_lock.slock); -				dport = 0;  			}  			sock_put(sk); -		} else { -			skb = tipc_skb_dequeue(inputq, dport); -		} -		if (likely(!skb))  			continue; -		if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) -			goto xmit; -		if (!err) { -			dnode = msg_destnode(buf_msg(skb)); -			goto xmit;  		} -		tn = net_generic(net, tipc_net_id); -		if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) + +		/* No destination socket => dequeue skb if still there */ +		skb = tipc_skb_dequeue(inputq, dport); +		if (!skb) +			return; + +		/* Try secondary lookup if unresolved named message */ +		err = TIPC_ERR_NO_PORT; +		if (tipc_msg_lookup_dest(net, skb, &err)) +			goto xmit; + +		/* Prepare for message rejection */ +		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))  			continue;  xmit: -		tipc_link_xmit_skb(net, skb, dnode, dport); +		dnode = msg_destnode(buf_msg(skb)); +		tipc_node_xmit_skb(net, skb, dnode, dport);  	} -	return err ? -EHOSTUNREACH : 0;  }  static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) @@ -2007,6 +2011,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)  	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);  	if (res)  		goto exit; +	security_sk_clone(sock->sk, new_sock->sk);  	new_sk = new_sock->sk;  	new_tsock = tipc_sk(new_sk); @@ -2066,7 +2071,10 @@ static int tipc_shutdown(struct socket *sock, int how)  	struct net *net = sock_net(sk);  	struct tipc_sock *tsk = tipc_sk(sk);  	struct sk_buff *skb; -	u32 dnode; +	u32 dnode = tsk_peer_node(tsk); +	u32 dport = tsk_peer_port(tsk); +	u32 onode = tipc_own_addr(net); +	u32 oport = tsk->portid;  	int res;  	if (how != SHUT_RDWR) @@ -2079,6 +2087,8 @@ static int tipc_shutdown(struct socket *sock, int how)  	case SS_CONNECTED:  restart: +		dnode = tsk_peer_node(tsk); +  		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */  		skb = __skb_dequeue(&sk->sk_receive_queue);  		if (skb) { @@ -2086,19 +2096,13 @@ restart:  				kfree_skb(skb);  				goto restart;  			} -			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -					     TIPC_CONN_SHUTDOWN)) -				tipc_link_xmit_skb(net, skb, dnode, -						   tsk->portid); +			tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);  		} else { -			dnode = tsk_peer_node(tsk); -  			skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,  					      TIPC_CONN_MSG, SHORT_H_SIZE, -					      0, dnode, tsk_own_node(tsk), -					      tsk_peer_port(tsk), -					      tsk->portid, TIPC_CONN_SHUTDOWN); -			tipc_link_xmit_skb(net, skb, dnode, tsk->portid); +					      0, dnode, onode, dport, oport, +					      TIPC_CONN_SHUTDOWN); +			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);  		}  		tsk->connected = 0;  		sock->state = SS_DISCONNECTING; @@ -2160,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data)  	}  	bh_unlock_sock(sk);  	if (skb) -		tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); +		tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);  exit:  	sock_put(sk);  } diff --git a/net/tipc/socket.h b/net/tipc/socket.h index bf6551389522..4241f22069dc 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -44,7 +44,7 @@  				  SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))  int tipc_socket_init(void);  void tipc_socket_stop(void); -int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); +void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);  void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  		       struct sk_buff_head *inputq);  void tipc_sk_reinit(struct net *net); diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index 66deebc66aa1..c170d3138953 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -194,7 +194,8 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,  			.saddr = src->ipv6,  			.flowi6_proto = IPPROTO_UDP  		}; -		err = ipv6_stub->ipv6_dst_lookup(ub->ubsock->sk, &ndst, &fl6); +		err = ipv6_stub->ipv6_dst_lookup(net, ub->ubsock->sk, &ndst, +						 &fl6);  		if (err)  			goto tx_error;  		ttl = ip6_dst_hoplimit(ndst);  |