diff options
Diffstat (limited to 'net/tipc/link.c')
| -rw-r--r-- | net/tipc/link.c | 223 | 
1 files changed, 81 insertions, 142 deletions
| diff --git a/net/tipc/link.c b/net/tipc/link.c index 69cd9bf3f561..d4b5de41b682 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@  /*   * net/tipc/link.c: TIPC link code   * - * Copyright (c) 1996-2007, 2012, Ericsson AB + * Copyright (c) 1996-2007, 2012-2014, Ericsson AB   * Copyright (c) 2004-2007, 2010-2013, Wind River Systems   * All rights reserved.   * @@ -78,8 +78,8 @@ static const char *link_unk_evt = "Unknown link event ";  static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,  				       struct sk_buff *buf);  static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf); -static int  link_recv_changeover_msg(struct tipc_link **l_ptr, -				     struct sk_buff **buf); +static int  tipc_link_tunnel_rcv(struct tipc_link **l_ptr, +				 struct sk_buff **buf);  static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);  static int  link_send_sections_long(struct tipc_port *sender,  				    struct iovec const *msg_sect, @@ -87,7 +87,6 @@ static int  link_send_sections_long(struct tipc_port *sender,  static void link_state_event(struct tipc_link *l_ptr, u32 event);  static void link_reset_statistics(struct tipc_link *l_ptr);  static void link_print(struct tipc_link *l_ptr, const char *str); -static void link_start(struct tipc_link *l_ptr);  static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);  static void tipc_link_send_sync(struct tipc_link *l);  static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf); @@ -278,9 +277,11 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,  	tipc_node_attach_link(n_ptr, l_ptr); -	k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); +	k_init_timer(&l_ptr->timer, (Handler)link_timeout, +		     (unsigned long)l_ptr);  	list_add_tail(&l_ptr->link_list, &b_ptr->links); -	tipc_k_signal((Handler)link_start, (unsigned long)l_ptr); + +	link_state_event(l_ptr, STARTING_EVT);  	return l_ptr;  } @@ -305,19 +306,13 @@ void tipc_link_delete(struct tipc_link *l_ptr)  	tipc_node_lock(l_ptr->owner);  	tipc_link_reset(l_ptr);  	tipc_node_detach_link(l_ptr->owner, l_ptr); -	tipc_link_stop(l_ptr); +	tipc_link_purge_queues(l_ptr);  	list_del_init(&l_ptr->link_list);  	tipc_node_unlock(l_ptr->owner);  	k_term_timer(&l_ptr->timer);  	kfree(l_ptr);  } -static void link_start(struct tipc_link *l_ptr) -{ -	tipc_node_lock(l_ptr->owner); -	link_state_event(l_ptr, STARTING_EVT); -	tipc_node_unlock(l_ptr->owner); -}  /**   * link_schedule_port - schedule port for deferred sending @@ -386,14 +381,7 @@ exit:   */  static void link_release_outqueue(struct tipc_link *l_ptr)  { -	struct sk_buff *buf = l_ptr->first_out; -	struct sk_buff *next; - -	while (buf) { -		next = buf->next; -		kfree_skb(buf); -		buf = next; -	} +	kfree_skb_list(l_ptr->first_out);  	l_ptr->first_out = NULL;  	l_ptr->out_queue_size = 0;  } @@ -410,37 +398,20 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)  }  /** - * tipc_link_stop - purge all inbound and outbound messages associated with link + * tipc_link_purge_queues - purge all pkt queues associated with link   * @l_ptr: pointer to link   */ -void tipc_link_stop(struct tipc_link *l_ptr) +void tipc_link_purge_queues(struct tipc_link *l_ptr)  { -	struct sk_buff *buf; -	struct sk_buff *next; - -	buf = l_ptr->oldest_deferred_in; -	while (buf) { -		next = buf->next; -		kfree_skb(buf); -		buf = next; -	} - -	buf = l_ptr->first_out; -	while (buf) { -		next = buf->next; -		kfree_skb(buf); -		buf = next; -	} - +	kfree_skb_list(l_ptr->oldest_deferred_in); +	kfree_skb_list(l_ptr->first_out);  	tipc_link_reset_fragments(l_ptr); -  	kfree_skb(l_ptr->proto_msg_queue);  	l_ptr->proto_msg_queue = NULL;  }  void tipc_link_reset(struct tipc_link *l_ptr)  { -	struct sk_buff *buf;  	u32 prev_state = l_ptr->state;  	u32 checkpoint = l_ptr->next_in_no;  	int was_active_link = tipc_link_is_active(l_ptr); @@ -461,8 +432,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)  	tipc_node_link_down(l_ptr->owner, l_ptr);  	tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); -	if (was_active_link && tipc_node_active_links(l_ptr->owner) && -	    l_ptr->owner->permit_changeover) { +	if (was_active_link && tipc_node_active_links(l_ptr->owner)) {  		l_ptr->reset_checkpoint = checkpoint;  		l_ptr->exp_msg_count = START_CHANGEOVER;  	} @@ -471,12 +441,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)  	link_release_outqueue(l_ptr);  	kfree_skb(l_ptr->proto_msg_queue);  	l_ptr->proto_msg_queue = NULL; -	buf = l_ptr->oldest_deferred_in; -	while (buf) { -		struct sk_buff *next = buf->next; -		kfree_skb(buf); -		buf = next; -	} +	kfree_skb_list(l_ptr->oldest_deferred_in);  	if (!list_empty(&l_ptr->waiting_ports))  		tipc_link_wakeup_ports(l_ptr, 1); @@ -517,10 +482,11 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  	if (!l_ptr->started && (event != STARTING_EVT))  		return;		/* Not yet. */ -	if (link_blocked(l_ptr)) { +	/* Check whether changeover is going on */ +	if (l_ptr->exp_msg_count) {  		if (event == TIMEOUT_EVT)  			link_set_timer(l_ptr, cont_intv); -		return;	  /* Changeover going on */ +		return;  	}  	switch (l_ptr->state) { @@ -790,8 +756,7 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  		return link_send_long_buf(l_ptr, buf);  	/* Packet can be queued or sent. */ -	if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) && -		   !link_congested(l_ptr))) { +	if (likely(!link_congested(l_ptr))) {  		link_add_to_outqueue(l_ptr, buf, msg);  		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); @@ -957,14 +922,13 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,  	if (likely(!link_congested(l_ptr))) {  		if (likely(msg_size(msg) <= l_ptr->max_pkt)) { -			if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) { -				link_add_to_outqueue(l_ptr, buf, msg); -				tipc_bearer_send(l_ptr->b_ptr, buf, -						 &l_ptr->media_addr); -				l_ptr->unacked_window = 0; -				return res; -			} -		} else +			link_add_to_outqueue(l_ptr, buf, msg); +			tipc_bearer_send(l_ptr->b_ptr, buf, +					 &l_ptr->media_addr); +			l_ptr->unacked_window = 0; +			return res; +		} +		else  			*used_max_pkt = l_ptr->max_pkt;  	}  	return tipc_link_send_buf(l_ptr, buf);  /* All other cases */ @@ -1013,8 +977,7 @@ exit:  			}  			/* Exit if link (or bearer) is congested */ -			if (link_congested(l_ptr) || -			    tipc_bearer_blocked(l_ptr->b_ptr)) { +			if (link_congested(l_ptr)) {  				res = link_schedule_port(l_ptr,  							 sender->ref, res);  				goto exit; @@ -1127,10 +1090,7 @@ again:  		if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {  			res = -EFAULT;  error: -			for (; buf_chain; buf_chain = buf) { -				buf = buf_chain->next; -				kfree_skb(buf_chain); -			} +			kfree_skb_list(buf_chain);  			return res;  		}  		sect_crs += sz; @@ -1180,18 +1140,12 @@ error:  		if (l_ptr->max_pkt < max_pkt) {  			sender->max_pkt = l_ptr->max_pkt;  			tipc_node_unlock(node); -			for (; buf_chain; buf_chain = buf) { -				buf = buf_chain->next; -				kfree_skb(buf_chain); -			} +			kfree_skb_list(buf_chain);  			goto again;  		}  	} else {  reject: -		for (; buf_chain; buf_chain = buf) { -			buf = buf_chain->next; -			kfree_skb(buf_chain); -		} +		kfree_skb_list(buf_chain);  		return tipc_port_reject_sections(sender, hdr, msg_sect,  						 len, TIPC_ERR_NO_NODE);  	} @@ -1209,7 +1163,7 @@ reject:  /*   * tipc_link_push_packet: Push one unsent packet to the media   */ -u32 tipc_link_push_packet(struct tipc_link *l_ptr) +static u32 tipc_link_push_packet(struct tipc_link *l_ptr)  {  	struct sk_buff *buf = l_ptr->first_out;  	u32 r_q_size = l_ptr->retransm_queue_size; @@ -1281,9 +1235,6 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)  {  	u32 res; -	if (tipc_bearer_blocked(l_ptr->b_ptr)) -		return; -  	do {  		res = tipc_link_push_packet(l_ptr);  	} while (!res); @@ -1370,26 +1321,15 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,  	msg = buf_msg(buf); -	if (tipc_bearer_blocked(l_ptr->b_ptr)) { -		if (l_ptr->retransm_queue_size == 0) { -			l_ptr->retransm_queue_head = msg_seqno(msg); -			l_ptr->retransm_queue_size = retransmits; -		} else { -			pr_err("Unexpected retransmit on link %s (qsize=%d)\n", -			       l_ptr->name, l_ptr->retransm_queue_size); +	/* Detect repeated retransmit failures */ +	if (l_ptr->last_retransmitted == msg_seqno(msg)) { +		if (++l_ptr->stale_count > 100) { +			link_retransmit_failure(l_ptr, buf); +			return;  		} -		return;  	} else { -		/* Detect repeated retransmit failures on unblocked bearer */ -		if (l_ptr->last_retransmitted == msg_seqno(msg)) { -			if (++l_ptr->stale_count > 100) { -				link_retransmit_failure(l_ptr, buf); -				return; -			} -		} else { -			l_ptr->last_retransmitted = msg_seqno(msg); -			l_ptr->stale_count = 1; -		} +		l_ptr->last_retransmitted = msg_seqno(msg); +		l_ptr->stale_count = 1;  	}  	while (retransmits && (buf != l_ptr->next_out) && buf) { @@ -1476,14 +1416,14 @@ static int link_recv_buf_validate(struct sk_buff *buf)  }  /** - * tipc_recv_msg - process TIPC messages arriving from off-node + * tipc_rcv - process TIPC packets/messages arriving from off-node   * @head: pointer to message buffer chain   * @tb_ptr: pointer to bearer message arrived on   *   * Invoked with no locks held.  Bearer pointer must point to a valid bearer   * structure (i.e. cannot be NULL), but bearer can be inactive.   */ -void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) +void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)  {  	read_lock_bh(&tipc_net_lock);  	while (head) { @@ -1498,6 +1438,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)  		int type;  		head = head->next; +		buf->next = NULL;  		/* Ensure bearer is still enabled */  		if (unlikely(!b_ptr->active)) @@ -1657,7 +1598,7 @@ deliver:  			continue;  		case CHANGEOVER_PROTOCOL:  			type = msg_type(msg); -			if (link_recv_changeover_msg(&l_ptr, &buf)) { +			if (tipc_link_tunnel_rcv(&l_ptr, &buf)) {  				msg = buf_msg(buf);  				seq_no = msg_seqno(msg);  				if (type == ORIGINAL_MSG) @@ -1786,7 +1727,8 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,  		l_ptr->proto_msg_queue = NULL;  	} -	if (link_blocked(l_ptr)) +	/* Don't send protocol message during link changeover */ +	if (l_ptr->exp_msg_count)  		return;  	/* Abort non-RESET send if communication with node is prohibited */ @@ -1861,12 +1803,6 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,  	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));  	buf->priority = TC_PRIO_CONTROL; -	/* Defer message if bearer is already blocked */ -	if (tipc_bearer_blocked(l_ptr->b_ptr)) { -		l_ptr->proto_msg_queue = buf; -		return; -	} -  	tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);  	l_ptr->unacked_window = 0;  	kfree_skb(buf); @@ -1885,7 +1821,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  	u32 msg_tol;  	struct tipc_msg *msg = buf_msg(buf); -	if (link_blocked(l_ptr)) +	/* Discard protocol message during link changeover */ +	if (l_ptr->exp_msg_count)  		goto exit;  	/* record unnumbered packet arrival (force mismatch on next timeout) */ @@ -1895,8 +1832,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  		if (tipc_own_addr > msg_prevnode(msg))  			l_ptr->b_ptr->net_plane = msg_net_plane(msg); -	l_ptr->owner->permit_changeover = msg_redundant_link(msg); -  	switch (msg_type(msg)) {  	case RESET_MSG: @@ -2012,13 +1947,13 @@ exit:  } -/* - * tipc_link_tunnel(): Send one message via a link belonging to - * another bearer. Owner node is locked. +/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to + * a different bearer. Owner node is locked.   */ -static void tipc_link_tunnel(struct tipc_link *l_ptr, -			     struct tipc_msg *tunnel_hdr, struct tipc_msg *msg, -			     u32 selector) +static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, +				  struct tipc_msg *tunnel_hdr, +				  struct tipc_msg *msg, +				  u32 selector)  {  	struct tipc_link *tunnel;  	struct sk_buff *buf; @@ -2041,12 +1976,13 @@ static void tipc_link_tunnel(struct tipc_link *l_ptr,  } - -/* - * changeover(): Send whole message queue via the remaining link - *               Owner node is locked. +/* tipc_link_failover_send_queue(): A link has gone down, but a second + * link is still active. We can do failover. Tunnel the failing link's + * whole send queue via the remaining link. This way, we don't lose + * any packets, and sequence order is preserved for subsequent traffic + * sent over the remaining link. Owner node is locked.   */ -void tipc_link_changeover(struct tipc_link *l_ptr) +void tipc_link_failover_send_queue(struct tipc_link *l_ptr)  {  	u32 msgcount = l_ptr->out_queue_size;  	struct sk_buff *crs = l_ptr->first_out; @@ -2057,11 +1993,6 @@ void tipc_link_changeover(struct tipc_link *l_ptr)  	if (!tunnel)  		return; -	if (!l_ptr->owner->permit_changeover) { -		pr_warn("%speer did not permit changeover\n", link_co_err); -		return; -	} -  	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,  		 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);  	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); @@ -2095,20 +2026,30 @@ void tipc_link_changeover(struct tipc_link *l_ptr)  			msgcount = msg_msgcnt(msg);  			while (msgcount--) {  				msg_set_seqno(m, msg_seqno(msg)); -				tipc_link_tunnel(l_ptr, &tunnel_hdr, m, -						 msg_link_selector(m)); +				tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m, +						      msg_link_selector(m));  				pos += align(msg_size(m));  				m = (struct tipc_msg *)pos;  			}  		} else { -			tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, -					 msg_link_selector(msg)); +			tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, +					      msg_link_selector(msg));  		}  		crs = crs->next;  	}  } -void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel) +/* tipc_link_dup_send_queue(): A second link has become active. Tunnel a + * duplicate of the first link's send queue via the new link. This way, we + * are guaranteed that currently queued packets from a socket are delivered + * before future traffic from the same socket, even if this is using the + * new link. The last arriving copy of each duplicate packet is dropped at + * the receiving end by the regular protocol check, so packet cardinality + * and sequence order is preserved per sender/receiver socket pair. + * Owner node is locked. + */ +void tipc_link_dup_send_queue(struct tipc_link *l_ptr, +			      struct tipc_link *tunnel)  {  	struct sk_buff *iter;  	struct tipc_msg tunnel_hdr; @@ -2164,12 +2105,14 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)  	return eb;  } -/* - *  link_recv_changeover_msg(): Receive tunneled packet sent - *  via other link. Node is locked. Return extracted buffer. +/*  tipc_link_tunnel_rcv(): Receive a tunneled packet, sent + *  via other link as result of a failover (ORIGINAL_MSG) or + *  a new active link (DUPLICATE_MSG). Failover packets are + *  returned to the active link for delivery upwards. + *  Owner node is locked.   */ -static int link_recv_changeover_msg(struct tipc_link **l_ptr, -				    struct sk_buff **buf) +static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr, +				struct sk_buff **buf)  {  	struct sk_buff *tunnel_buf = *buf;  	struct tipc_link *dest_link; @@ -2306,11 +2249,7 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  		fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);  		if (fragm == NULL) {  			kfree_skb(buf); -			while (buf_chain) { -				buf = buf_chain; -				buf_chain = buf_chain->next; -				kfree_skb(buf); -			} +			kfree_skb_list(buf_chain);  			return -ENOMEM;  		}  		msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); |