diff options
Diffstat (limited to 'net/tipc/link.c')
| -rw-r--r-- | net/tipc/link.c | 232 | 
1 files changed, 99 insertions, 133 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index a79c755cb417..daa6080a2a0c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@  /*   * net/tipc/link.c: TIPC link code   * - * Copyright (c) 1996-2007, Ericsson AB + * Copyright (c) 1996-2007, 2012, Ericsson AB   * Copyright (c) 2004-2007, 2010-2011, Wind River Systems   * All rights reserved.   * @@ -97,12 +97,13 @@ static int  link_send_sections_long(struct tipc_port *sender,  				    struct iovec const *msg_sect,  				    u32 num_sect, unsigned int total_len,  				    u32 destnode); -static void link_check_defragm_bufs(struct tipc_link *l_ptr);  static void link_state_event(struct tipc_link *l_ptr, u32 event);  static void link_reset_statistics(struct tipc_link *l_ptr);  static void link_print(struct tipc_link *l_ptr, const char *str);  static void link_start(struct tipc_link *l_ptr);  static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); +static void tipc_link_send_sync(struct tipc_link *l); +static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);  /*   *  Simple link routines @@ -269,7 +270,6 @@ static void link_timeout(struct tipc_link *l_ptr)  	}  	/* do all other link processing performed on a periodic basis */ -	link_check_defragm_bufs(l_ptr);  	link_state_event(l_ptr, TIMEOUT_EVT); @@ -712,6 +712,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			link_activate(l_ptr);  			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++; +			if (l_ptr->owner->working_links == 1) +				tipc_link_send_sync(l_ptr);  			link_set_timer(l_ptr, cont_intv);  			break;  		case RESET_MSG: @@ -745,6 +747,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			link_activate(l_ptr);  			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++; +			if (l_ptr->owner->working_links == 1) +				tipc_link_send_sync(l_ptr);  			link_set_timer(l_ptr, cont_intv);  			break;  		case RESET_MSG: @@ -872,17 +876,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  		return link_send_long_buf(l_ptr, buf);  	/* Packet can be queued or sent. */ -	if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && +	if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&  		   !link_congested(l_ptr))) {  		link_add_to_outqueue(l_ptr, buf, msg); -		if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { -			l_ptr->unacked_window = 0; -		} else { -			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); -			l_ptr->stats.bearer_congs++; -			l_ptr->next_out = buf; -		} +		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +		l_ptr->unacked_window = 0;  		return dsz;  	}  	/* Congestion: can message be bundled ? */ @@ -891,10 +890,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  		/* Try adding message to an existing bundle */  		if (l_ptr->next_out && -		    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { -			tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); +		    link_bundle_buf(l_ptr, l_ptr->last_out, buf))  			return dsz; -		}  		/* Try creating a new bundle */  		if (size <= max_packet * 2 / 3) { @@ -917,7 +914,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  	if (!l_ptr->next_out)  		l_ptr->next_out = buf;  	link_add_to_outqueue(l_ptr, buf, msg); -	tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);  	return dsz;  } @@ -949,7 +945,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)  	return res;  } -/** +/* + * tipc_link_send_sync - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + * + * Called with node locked + */ +static void tipc_link_send_sync(struct tipc_link *l) +{ +	struct sk_buff *buf; +	struct tipc_msg *msg; + +	buf = tipc_buf_acquire(INT_H_SIZE); +	if (!buf) +		return; + +	msg = buf_msg(buf); +	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); +	msg_set_last_bcast(msg, l->owner->bclink.acked); +	link_add_chain_to_outqueue(l, buf, 0); +	tipc_link_push_queue(l); +} + +/* + * tipc_link_recv_sync - synchronize broadcast link endpoints. + * Receive the sequence number where we should start receiving and + * acking broadcast packets from a newly added peer node, and open + * up for reception of such packets. + * + * Called with node locked + */ +static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) +{ +	struct tipc_msg *msg = buf_msg(buf); + +	n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg); +	n->bclink.recv_permitted = true; +	kfree_skb(buf); +} + +/*   * tipc_link_send_names - send name table entries to new neighbor   *   * Send routine for bulk delivery of name table messages when contact @@ -1006,16 +1043,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,  	if (likely(!link_congested(l_ptr))) {  		if (likely(msg_size(msg) <= l_ptr->max_pkt)) { -			if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { +			if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {  				link_add_to_outqueue(l_ptr, buf, msg); -				if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, -							    &l_ptr->media_addr))) { -					l_ptr->unacked_window = 0; -					return res; -				} -				tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); -				l_ptr->stats.bearer_congs++; -				l_ptr->next_out = buf; +				tipc_bearer_send(l_ptr->b_ptr, buf, +						 &l_ptr->media_addr); +				l_ptr->unacked_window = 0;  				return res;  			}  		} else @@ -1106,7 +1138,7 @@ exit:  			/* Exit if link (or bearer) is congested */  			if (link_congested(l_ptr) || -			    !list_empty(&l_ptr->b_ptr->cong_links)) { +			    tipc_bearer_blocked(l_ptr->b_ptr)) {  				res = link_schedule_port(l_ptr,  							 sender->ref, res);  				goto exit; @@ -1329,15 +1361,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)  	if (r_q_size && buf) {  		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));  		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); -		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -			l_ptr->retransm_queue_head = mod(++r_q_head); -			l_ptr->retransm_queue_size = --r_q_size; -			l_ptr->stats.retransmitted++; -			return 0; -		} else { -			l_ptr->stats.bearer_congs++; -			return PUSH_FAILED; -		} +		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +		l_ptr->retransm_queue_head = mod(++r_q_head); +		l_ptr->retransm_queue_size = --r_q_size; +		l_ptr->stats.retransmitted++; +		return 0;  	}  	/* Send deferred protocol message, if any: */ @@ -1345,15 +1373,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)  	if (buf) {  		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));  		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); -		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -			l_ptr->unacked_window = 0; -			kfree_skb(buf); -			l_ptr->proto_msg_queue = NULL; -			return 0; -		} else { -			l_ptr->stats.bearer_congs++; -			return PUSH_FAILED; -		} +		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +		l_ptr->unacked_window = 0; +		kfree_skb(buf); +		l_ptr->proto_msg_queue = NULL; +		return 0;  	}  	/* Send one deferred data message, if send window not full: */ @@ -1366,18 +1390,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)  		if (mod(next - first) < l_ptr->queue_limit[0]) {  			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));  			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); -			if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -				if (msg_user(msg) == MSG_BUNDLER) -					msg_set_type(msg, CLOSED_MSG); -				l_ptr->next_out = buf->next; -				return 0; -			} else { -				l_ptr->stats.bearer_congs++; -				return PUSH_FAILED; -			} +			tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +			if (msg_user(msg) == MSG_BUNDLER) +				msg_set_type(msg, CLOSED_MSG); +			l_ptr->next_out = buf->next; +			return 0;  		}  	} -	return PUSH_FINISHED; +	return 1;  }  /* @@ -1388,15 +1408,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)  {  	u32 res; -	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) +	if (tipc_bearer_blocked(l_ptr->b_ptr))  		return;  	do {  		res = tipc_link_push_packet(l_ptr);  	} while (!res); - -	if (res == PUSH_FAILED) -		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);  }  static void link_reset_all(unsigned long addr) @@ -1454,9 +1471,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,  		tipc_addr_string_fill(addr_string, n_ptr->addr);  		pr_info("Broadcast link info for %s\n", addr_string); -		pr_info("Supportable: %d,  Supported: %d,  Acked: %u\n", -			n_ptr->bclink.supportable, -			n_ptr->bclink.supported, +		pr_info("Reception permitted: %d,  Acked: %u\n", +			n_ptr->bclink.recv_permitted,  			n_ptr->bclink.acked);  		pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",  			n_ptr->bclink.last_in, @@ -1481,7 +1497,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,  	msg = buf_msg(buf); -	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { +	if (tipc_bearer_blocked(l_ptr->b_ptr)) {  		if (l_ptr->retransm_queue_size == 0) {  			l_ptr->retransm_queue_head = msg_seqno(msg);  			l_ptr->retransm_queue_size = retransmits; @@ -1491,7 +1507,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,  		}  		return;  	} else { -		/* Detect repeated retransmit failures on uncongested bearer */ +		/* Detect repeated retransmit failures on unblocked bearer */  		if (l_ptr->last_retransmitted == msg_seqno(msg)) {  			if (++l_ptr->stale_count > 100) {  				link_retransmit_failure(l_ptr, buf); @@ -1507,17 +1523,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,  		msg = buf_msg(buf);  		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));  		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); -		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -			buf = buf->next; -			retransmits--; -			l_ptr->stats.retransmitted++; -		} else { -			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); -			l_ptr->stats.bearer_congs++; -			l_ptr->retransm_queue_head = buf_seqno(buf); -			l_ptr->retransm_queue_size = retransmits; -			return; -		} +		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +		buf = buf->next; +		retransmits--; +		l_ptr->stats.retransmitted++;  	}  	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; @@ -1676,7 +1685,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)  		ackd = msg_ack(msg);  		/* Release acked messages */ -		if (n_ptr->bclink.supported) +		if (n_ptr->bclink.recv_permitted)  			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));  		crs = l_ptr->first_out; @@ -1727,9 +1736,14 @@ deliver:  					tipc_link_recv_bundle(buf);  					continue;  				case NAME_DISTRIBUTOR: +					n_ptr->bclink.recv_permitted = true;  					tipc_node_unlock(n_ptr);  					tipc_named_recv(buf);  					continue; +				case BCAST_PROTOCOL: +					tipc_link_recv_sync(n_ptr, buf); +					tipc_node_unlock(n_ptr); +					continue;  				case CONN_MANAGER:  					tipc_node_unlock(n_ptr);  					tipc_port_recv_proto_msg(buf); @@ -1772,16 +1786,19 @@ deliver:  			continue;  		} +		/* Link is not in state WORKING_WORKING */  		if (msg_user(msg) == LINK_PROTOCOL) {  			link_recv_proto_msg(l_ptr, buf);  			head = link_insert_deferred_queue(l_ptr, head);  			tipc_node_unlock(n_ptr);  			continue;  		} + +		/* Traffic message. Conditionally activate link */  		link_state_event(l_ptr, TRAFFIC_MSG_EVT);  		if (link_working_working(l_ptr)) { -			/* Re-insert in front of queue */ +			/* Re-insert buffer in front of queue */  			buf->next = head;  			head = buf;  			tipc_node_unlock(n_ptr); @@ -1972,21 +1989,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,  	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); -	/* Defer message if bearer is already congested */ -	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { -		l_ptr->proto_msg_queue = buf; -		return; -	} - -	/* Defer message if attempting to send results in bearer congestion */ -	if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); +	/* Defer message if bearer is already blocked */ +	if (tipc_bearer_blocked(l_ptr->b_ptr)) {  		l_ptr->proto_msg_queue = buf; -		l_ptr->stats.bearer_congs++;  		return;  	} -	/* Discard message if it was sent successfully */ +	tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);  	l_ptr->unacked_window = 0;  	kfree_skb(buf);  } @@ -2057,7 +2066,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  		} else {  			l_ptr->max_pkt = l_ptr->max_pkt_target;  		} -		l_ptr->owner->bclink.supportable = (max_pkt_info != 0);  		/* Synchronize broadcast link info, if not done previously */  		if (!tipc_node_is_up(l_ptr->owner)) { @@ -2112,7 +2120,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  		}  		/* Protocol message before retransmits, reduce loss risk */ -		if (l_ptr->owner->bclink.supported) +		if (l_ptr->owner->bclink.recv_permitted)  			tipc_bclink_update_link_state(l_ptr->owner,  						      msg_last_bcast(msg)); @@ -2487,16 +2495,6 @@ static void set_expected_frags(struct sk_buff *buf, u32 exp)  	msg_set_bcast_ack(buf_msg(buf), exp);  } -static u32 get_timer_cnt(struct sk_buff *buf) -{ -	return msg_reroute_cnt(buf_msg(buf)); -} - -static void incr_timer_cnt(struct sk_buff *buf) -{ -	msg_incr_reroute_cnt(buf_msg(buf)); -} -  /*   * tipc_link_recv_fragment(): Called with node lock on. Returns   * the reassembled buffer if message is complete. @@ -2575,38 +2573,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,  	return 0;  } -/** - * link_check_defragm_bufs - flush stale incoming message fragments - * @l_ptr: pointer to link - */ -static void link_check_defragm_bufs(struct tipc_link *l_ptr) -{ -	struct sk_buff *prev = NULL; -	struct sk_buff *next = NULL; -	struct sk_buff *buf = l_ptr->defragm_buf; - -	if (!buf) -		return; -	if (!link_working_working(l_ptr)) -		return; -	while (buf) { -		u32 cnt = get_timer_cnt(buf); - -		next = buf->next; -		if (cnt < 4) { -			incr_timer_cnt(buf); -			prev = buf; -		} else { -			if (prev) -				prev->next = buf->next; -			else -				l_ptr->defragm_buf = buf->next; -			kfree_skb(buf); -		} -		buf = next; -	} -} -  static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)  {  	if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) @@ -2937,8 +2903,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)  			     s->sent_nacks, s->sent_acks, s->retransmitted);  	ret += tipc_snprintf(buf + ret, buf_size - ret, -			     "  Congestion bearer:%u link:%u  Send queue" -			     " max:%u avg:%u\n", s->bearer_congs, s->link_congs, +			     "  Congestion link:%u  Send queue" +			     " max:%u avg:%u\n", s->link_congs,  			     s->max_queue_sz, s->queue_sz_counts ?  			     (s->accu_queue_sz / s->queue_sz_counts) : 0);  |