diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/Kconfig | 13 | ||||
| -rw-r--r-- | net/tipc/bcast.c | 27 | ||||
| -rw-r--r-- | net/tipc/bearer.c | 110 | ||||
| -rw-r--r-- | net/tipc/bearer.h | 24 | ||||
| -rw-r--r-- | net/tipc/core.c | 5 | ||||
| -rw-r--r-- | net/tipc/discover.c | 2 | ||||
| -rw-r--r-- | net/tipc/link.c | 232 | ||||
| -rw-r--r-- | net/tipc/link.h | 4 | ||||
| -rw-r--r-- | net/tipc/name_distr.c | 2 | ||||
| -rw-r--r-- | net/tipc/node.c | 15 | ||||
| -rw-r--r-- | net/tipc/node.h | 6 | ||||
| -rw-r--r-- | net/tipc/port.c | 32 | ||||
| -rw-r--r-- | net/tipc/port.h | 6 | ||||
| -rw-r--r-- | net/tipc/socket.c | 411 | ||||
| -rw-r--r-- | net/tipc/subscr.c | 2 | 
15 files changed, 410 insertions, 481 deletions
| diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig index 585460180ffb..bc41bd31eadc 100644 --- a/net/tipc/Kconfig +++ b/net/tipc/Kconfig @@ -20,18 +20,9 @@ menuconfig TIPC  	  If in doubt, say N. -if TIPC - -config TIPC_ADVANCED -	bool "Advanced TIPC configuration" -	default n -	help -	  Saying Y here will open some advanced configuration for TIPC. -	  Most users do not need to bother; if unsure, just say N. -  config TIPC_PORTS  	int "Maximum number of ports in a node" -	depends on TIPC_ADVANCED +	depends on TIPC  	range 127 65535  	default "8191"  	help @@ -40,5 +31,3 @@ config TIPC_PORTS  	  Setting this to a smaller value saves some memory,  	  setting it to higher allows for more ports. - -endif # TIPC diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index e4e6d8cd47e6..54f89f90ac33 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -347,7 +347,7 @@ static void bclink_peek_nack(struct tipc_msg *msg)  	tipc_node_lock(n_ptr); -	if (n_ptr->bclink.supported && +	if (n_ptr->bclink.recv_permitted &&  	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&  	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))  		n_ptr->bclink.oos_state = 2; @@ -429,7 +429,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)  		goto exit;  	tipc_node_lock(node); -	if (unlikely(!node->bclink.supported)) +	if (unlikely(!node->bclink.recv_permitted))  		goto unlock;  	/* Handle broadcast protocol message */ @@ -564,7 +564,7 @@ exit:  u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)  { -	return (n_ptr->bclink.supported && +	return (n_ptr->bclink.recv_permitted &&  		(tipc_bclink_get_last_sent() != n_ptr->bclink.acked));  } @@ -619,16 +619,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf,  		if (bcbearer->remains_new.count == bcbearer->remains.count)  			continue;	/* bearer pair doesn't add anything */ -		if (p->blocked || -		    p->media->send_msg(buf, p, &p->media->bcast_addr)) { +		if (!tipc_bearer_blocked(p)) +			tipc_bearer_send(p, buf, &p->media->bcast_addr); +		else if (s && !tipc_bearer_blocked(s))  			/* unable to send on primary bearer */ -			if (!s || s->blocked || -			    s->media->send_msg(buf, s, -					       &s->media->bcast_addr)) { -				/* unable to send on either bearer */ -				continue; -			} -		} +			tipc_bearer_send(s, buf, &s->media->bcast_addr); +		else +			/* unable to send on either bearer */ +			continue;  		if (s) {  			bcbearer->bpairs[bp_index].primary = s; @@ -731,8 +729,8 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)  			     "  TX naks:%u acks:%u dups:%u\n",  			     s->sent_nacks, s->sent_acks, s->retransmitted);  	ret += tipc_snprintf(buf + ret, buf_size - ret, -			     "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n", -			     s->bearer_congs, s->link_congs, s->max_queue_sz, +			     "  Congestion link:%u  Send queue max:%u avg:%u\n", +			     s->link_congs, s->max_queue_sz,  			     s->queue_sz_counts ?  			     (s->accu_queue_sz / s->queue_sz_counts) : 0); @@ -766,7 +764,6 @@ int tipc_bclink_set_queue_limits(u32 limit)  void tipc_bclink_init(void)  { -	INIT_LIST_HEAD(&bcbearer->bearer.cong_links);  	bcbearer->bearer.media = &bcbearer->media;  	bcbearer->media.send_msg = tipc_bcbearer_send;  	sprintf(bcbearer->media.name, "tipc-broadcast"); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 4ec5c80e8a7c..aa62f93a9127 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -279,116 +279,31 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)  }  /* - * bearer_push(): Resolve bearer congestion. Force the waiting - * links to push out their unsent packets, one packet per link - * per iteration, until all packets are gone or congestion reoccurs. - * 'tipc_net_lock' is read_locked when this function is called - * bearer.lock must be taken before calling - * Returns binary true(1) ore false(0) - */ -static int bearer_push(struct tipc_bearer *b_ptr) -{ -	u32 res = 0; -	struct tipc_link *ln, *tln; - -	if (b_ptr->blocked) -		return 0; - -	while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) { -		list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) { -			res = tipc_link_push_packet(ln); -			if (res == PUSH_FAILED) -				break; -			if (res == PUSH_FINISHED) -				list_move_tail(&ln->link_list, &b_ptr->links); -		} -	} -	return list_empty(&b_ptr->cong_links); -} - -void tipc_bearer_lock_push(struct tipc_bearer *b_ptr) -{ -	spin_lock_bh(&b_ptr->lock); -	bearer_push(b_ptr); -	spin_unlock_bh(&b_ptr->lock); -} - - -/* - * Interrupt enabling new requests after bearer congestion or blocking: + * Interrupt enabling new requests after bearer blocking:   * See bearer_send().   */ -void tipc_continue(struct tipc_bearer *b_ptr) +void tipc_continue(struct tipc_bearer *b)  { -	spin_lock_bh(&b_ptr->lock); -	if (!list_empty(&b_ptr->cong_links)) -		tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr); -	b_ptr->blocked = 0; -	spin_unlock_bh(&b_ptr->lock); +	spin_lock_bh(&b->lock); +	b->blocked = 0; +	spin_unlock_bh(&b->lock);  }  /* - * Schedule link for sending of messages after the bearer - * has been deblocked by 'continue()'. This method is called - * when somebody tries to send a message via this link while - * the bearer is congested. 'tipc_net_lock' is in read_lock here - * bearer.lock is busy + * tipc_bearer_blocked - determines if bearer is currently blocked   */ -static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr, -						struct tipc_link *l_ptr) +int tipc_bearer_blocked(struct tipc_bearer *b)  { -	list_move_tail(&l_ptr->link_list, &b_ptr->cong_links); -} - -/* - * Schedule link for sending of messages after the bearer - * has been deblocked by 'continue()'. This method is called - * when somebody tries to send a message via this link while - * the bearer is congested. 'tipc_net_lock' is in read_lock here, - * bearer.lock is free - */ -void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr) -{ -	spin_lock_bh(&b_ptr->lock); -	tipc_bearer_schedule_unlocked(b_ptr, l_ptr); -	spin_unlock_bh(&b_ptr->lock); -} - +	int res; -/* - * tipc_bearer_resolve_congestion(): Check if there is bearer congestion, - * and if there is, try to resolve it before returning. - * 'tipc_net_lock' is read_locked when this function is called - */ -int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, -					struct tipc_link *l_ptr) -{ -	int res = 1; +	spin_lock_bh(&b->lock); +	res = b->blocked; +	spin_unlock_bh(&b->lock); -	if (list_empty(&b_ptr->cong_links)) -		return 1; -	spin_lock_bh(&b_ptr->lock); -	if (!bearer_push(b_ptr)) { -		tipc_bearer_schedule_unlocked(b_ptr, l_ptr); -		res = 0; -	} -	spin_unlock_bh(&b_ptr->lock);  	return res;  }  /** - * tipc_bearer_congested - determines if bearer is currently congested - */ -int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr) -{ -	if (unlikely(b_ptr->blocked)) -		return 1; -	if (likely(list_empty(&b_ptr->cong_links))) -		return 0; -	return !tipc_bearer_resolve_congestion(b_ptr, l_ptr); -} - -/**   * tipc_enable_bearer - enable bearer with the given name   */  int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority) @@ -489,7 +404,6 @@ restart:  	b_ptr->net_plane = bearer_id + 'A';  	b_ptr->active = 1;  	b_ptr->priority = priority; -	INIT_LIST_HEAD(&b_ptr->cong_links);  	INIT_LIST_HEAD(&b_ptr->links);  	spin_lock_init(&b_ptr->lock); @@ -528,7 +442,6 @@ int tipc_block_bearer(const char *name)  	pr_info("Blocking bearer <%s>\n", name);  	spin_lock_bh(&b_ptr->lock);  	b_ptr->blocked = 1; -	list_splice_init(&b_ptr->cong_links, &b_ptr->links);  	list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {  		struct tipc_node *n_ptr = l_ptr->owner; @@ -555,7 +468,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr)  	spin_lock_bh(&b_ptr->lock);  	b_ptr->blocked = 1;  	b_ptr->media->disable_bearer(b_ptr); -	list_splice_init(&b_ptr->cong_links, &b_ptr->links);  	list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {  		tipc_link_delete(l_ptr);  	} diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index dd4c2abf08e7..39f1192d04bf 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -120,7 +120,6 @@ struct tipc_media {   * @identity: array index of this bearer within TIPC bearer array   * @link_req: ptr to (optional) structure making periodic link setup requests   * @links: list of non-congested links associated with bearer - * @cong_links: list of congested links associated with bearer   * @active: non-zero if bearer structure is represents a bearer   * @net_plane: network plane ('A' through 'H') currently associated with bearer   * @nodes: indicates which nodes in cluster can be reached through bearer @@ -143,7 +142,6 @@ struct tipc_bearer {  	u32 identity;  	struct tipc_link_req *link_req;  	struct list_head links; -	struct list_head cong_links;  	int active;  	char net_plane;  	struct tipc_node_map nodes; @@ -185,39 +183,23 @@ struct sk_buff *tipc_media_get_names(void);  struct sk_buff *tipc_bearer_get_names(void);  void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);  void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest); -void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);  struct tipc_bearer *tipc_bearer_find(const char *name);  struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);  struct tipc_media *tipc_media_find(const char *name); -int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, -				   struct tipc_link *l_ptr); -int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr); +int tipc_bearer_blocked(struct tipc_bearer *b_ptr);  void tipc_bearer_stop(void); -void tipc_bearer_lock_push(struct tipc_bearer *b_ptr); -  /**   * tipc_bearer_send- sends buffer to destination over bearer   * - * Returns true (1) if successful, or false (0) if unable to send - *   * IMPORTANT:   * The media send routine must not alter the buffer being passed in   * as it may be needed for later retransmission! - * - * If the media send routine returns a non-zero value (indicating that - * it was unable to send the buffer), it must: - *   1) mark the bearer as blocked, - *   2) call tipc_continue() once the bearer is able to send again. - * Media types that are unable to meet these two critera must ensure their - * send routine always returns success -- even if the buffer was not sent -- - * and let TIPC's link code deal with the undelivered message.   */ -static inline int tipc_bearer_send(struct tipc_bearer *b_ptr, -				   struct sk_buff *buf, +static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,  				   struct tipc_media_addr *dest)  { -	return !b_ptr->media->send_msg(buf, b_ptr, dest); +	b->media->send_msg(buf, b, dest);  }  #endif	/* _TIPC_BEARER_H */ diff --git a/net/tipc/core.c b/net/tipc/core.c index bfe8af88469a..fc05cecd7481 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -42,11 +42,6 @@  #include <linux/module.h> -#ifndef CONFIG_TIPC_PORTS -#define CONFIG_TIPC_PORTS 8191 -#endif - -  /* global variables used by multiple sub-systems within TIPC */  int tipc_random __read_mostly; diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 50eaa403eb6e..1074b9587e81 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -243,7 +243,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)  	if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {  		rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);  		if (rbuf) { -			b_ptr->media->send_msg(rbuf, b_ptr, &media_addr); +			tipc_bearer_send(b_ptr, rbuf, &media_addr);  			kfree_skb(rbuf);  		}  	} diff --git a/net/tipc/link.c b/net/tipc/link.c index a79c755cb417..daa6080a2a0c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@  /*   * net/tipc/link.c: TIPC link code   * - * Copyright (c) 1996-2007, Ericsson AB + * Copyright (c) 1996-2007, 2012, Ericsson AB   * Copyright (c) 2004-2007, 2010-2011, Wind River Systems   * All rights reserved.   * @@ -97,12 +97,13 @@ static int  link_send_sections_long(struct tipc_port *sender,  				    struct iovec const *msg_sect,  				    u32 num_sect, unsigned int total_len,  				    u32 destnode); -static void link_check_defragm_bufs(struct tipc_link *l_ptr);  static void link_state_event(struct tipc_link *l_ptr, u32 event);  static void link_reset_statistics(struct tipc_link *l_ptr);  static void link_print(struct tipc_link *l_ptr, const char *str);  static void link_start(struct tipc_link *l_ptr);  static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); +static void tipc_link_send_sync(struct tipc_link *l); +static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);  /*   *  Simple link routines @@ -269,7 +270,6 @@ static void link_timeout(struct tipc_link *l_ptr)  	}  	/* do all other link processing performed on a periodic basis */ -	link_check_defragm_bufs(l_ptr);  	link_state_event(l_ptr, TIMEOUT_EVT); @@ -712,6 +712,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			link_activate(l_ptr);  			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++; +			if (l_ptr->owner->working_links == 1) +				tipc_link_send_sync(l_ptr);  			link_set_timer(l_ptr, cont_intv);  			break;  		case RESET_MSG: @@ -745,6 +747,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			link_activate(l_ptr);  			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++; +			if (l_ptr->owner->working_links == 1) +				tipc_link_send_sync(l_ptr);  			link_set_timer(l_ptr, cont_intv);  			break;  		case RESET_MSG: @@ -872,17 +876,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  		return link_send_long_buf(l_ptr, buf);  	/* Packet can be queued or sent. */ -	if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && +	if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&  		   !link_congested(l_ptr))) {  		link_add_to_outqueue(l_ptr, buf, msg); -		if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { -			l_ptr->unacked_window = 0; -		} else { -			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); -			l_ptr->stats.bearer_congs++; -			l_ptr->next_out = buf; -		} +		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +		l_ptr->unacked_window = 0;  		return dsz;  	}  	/* Congestion: can message be bundled ? */ @@ -891,10 +890,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  		/* Try adding message to an existing bundle */  		if (l_ptr->next_out && -		    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { -			tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); +		    link_bundle_buf(l_ptr, l_ptr->last_out, buf))  			return dsz; -		}  		/* Try creating a new bundle */  		if (size <= max_packet * 2 / 3) { @@ -917,7 +914,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  	if (!l_ptr->next_out)  		l_ptr->next_out = buf;  	link_add_to_outqueue(l_ptr, buf, msg); -	tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);  	return dsz;  } @@ -949,7 +945,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)  	return res;  } -/** +/* + * tipc_link_send_sync - synchronize broadcast link endpoints. + * + * Give a newly added peer node the sequence number where it should + * start receiving and acking broadcast packets. + * + * Called with node locked + */ +static void tipc_link_send_sync(struct tipc_link *l) +{ +	struct sk_buff *buf; +	struct tipc_msg *msg; + +	buf = tipc_buf_acquire(INT_H_SIZE); +	if (!buf) +		return; + +	msg = buf_msg(buf); +	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr); +	msg_set_last_bcast(msg, l->owner->bclink.acked); +	link_add_chain_to_outqueue(l, buf, 0); +	tipc_link_push_queue(l); +} + +/* + * tipc_link_recv_sync - synchronize broadcast link endpoints. + * Receive the sequence number where we should start receiving and + * acking broadcast packets from a newly added peer node, and open + * up for reception of such packets. + * + * Called with node locked + */ +static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) +{ +	struct tipc_msg *msg = buf_msg(buf); + +	n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg); +	n->bclink.recv_permitted = true; +	kfree_skb(buf); +} + +/*   * tipc_link_send_names - send name table entries to new neighbor   *   * Send routine for bulk delivery of name table messages when contact @@ -1006,16 +1043,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,  	if (likely(!link_congested(l_ptr))) {  		if (likely(msg_size(msg) <= l_ptr->max_pkt)) { -			if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { +			if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {  				link_add_to_outqueue(l_ptr, buf, msg); -				if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, -							    &l_ptr->media_addr))) { -					l_ptr->unacked_window = 0; -					return res; -				} -				tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); -				l_ptr->stats.bearer_congs++; -				l_ptr->next_out = buf; +				tipc_bearer_send(l_ptr->b_ptr, buf, +						 &l_ptr->media_addr); +				l_ptr->unacked_window = 0;  				return res;  			}  		} else @@ -1106,7 +1138,7 @@ exit:  			/* Exit if link (or bearer) is congested */  			if (link_congested(l_ptr) || -			    !list_empty(&l_ptr->b_ptr->cong_links)) { +			    tipc_bearer_blocked(l_ptr->b_ptr)) {  				res = link_schedule_port(l_ptr,  							 sender->ref, res);  				goto exit; @@ -1329,15 +1361,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)  	if (r_q_size && buf) {  		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));  		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); -		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -			l_ptr->retransm_queue_head = mod(++r_q_head); -			l_ptr->retransm_queue_size = --r_q_size; -			l_ptr->stats.retransmitted++; -			return 0; -		} else { -			l_ptr->stats.bearer_congs++; -			return PUSH_FAILED; -		} +		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +		l_ptr->retransm_queue_head = mod(++r_q_head); +		l_ptr->retransm_queue_size = --r_q_size; +		l_ptr->stats.retransmitted++; +		return 0;  	}  	/* Send deferred protocol message, if any: */ @@ -1345,15 +1373,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)  	if (buf) {  		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));  		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); -		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -			l_ptr->unacked_window = 0; -			kfree_skb(buf); -			l_ptr->proto_msg_queue = NULL; -			return 0; -		} else { -			l_ptr->stats.bearer_congs++; -			return PUSH_FAILED; -		} +		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +		l_ptr->unacked_window = 0; +		kfree_skb(buf); +		l_ptr->proto_msg_queue = NULL; +		return 0;  	}  	/* Send one deferred data message, if send window not full: */ @@ -1366,18 +1390,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)  		if (mod(next - first) < l_ptr->queue_limit[0]) {  			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));  			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); -			if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -				if (msg_user(msg) == MSG_BUNDLER) -					msg_set_type(msg, CLOSED_MSG); -				l_ptr->next_out = buf->next; -				return 0; -			} else { -				l_ptr->stats.bearer_congs++; -				return PUSH_FAILED; -			} +			tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +			if (msg_user(msg) == MSG_BUNDLER) +				msg_set_type(msg, CLOSED_MSG); +			l_ptr->next_out = buf->next; +			return 0;  		}  	} -	return PUSH_FINISHED; +	return 1;  }  /* @@ -1388,15 +1408,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)  {  	u32 res; -	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) +	if (tipc_bearer_blocked(l_ptr->b_ptr))  		return;  	do {  		res = tipc_link_push_packet(l_ptr);  	} while (!res); - -	if (res == PUSH_FAILED) -		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);  }  static void link_reset_all(unsigned long addr) @@ -1454,9 +1471,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,  		tipc_addr_string_fill(addr_string, n_ptr->addr);  		pr_info("Broadcast link info for %s\n", addr_string); -		pr_info("Supportable: %d,  Supported: %d,  Acked: %u\n", -			n_ptr->bclink.supportable, -			n_ptr->bclink.supported, +		pr_info("Reception permitted: %d,  Acked: %u\n", +			n_ptr->bclink.recv_permitted,  			n_ptr->bclink.acked);  		pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",  			n_ptr->bclink.last_in, @@ -1481,7 +1497,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,  	msg = buf_msg(buf); -	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { +	if (tipc_bearer_blocked(l_ptr->b_ptr)) {  		if (l_ptr->retransm_queue_size == 0) {  			l_ptr->retransm_queue_head = msg_seqno(msg);  			l_ptr->retransm_queue_size = retransmits; @@ -1491,7 +1507,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,  		}  		return;  	} else { -		/* Detect repeated retransmit failures on uncongested bearer */ +		/* Detect repeated retransmit failures on unblocked bearer */  		if (l_ptr->last_retransmitted == msg_seqno(msg)) {  			if (++l_ptr->stale_count > 100) {  				link_retransmit_failure(l_ptr, buf); @@ -1507,17 +1523,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,  		msg = buf_msg(buf);  		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));  		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); -		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -			buf = buf->next; -			retransmits--; -			l_ptr->stats.retransmitted++; -		} else { -			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); -			l_ptr->stats.bearer_congs++; -			l_ptr->retransm_queue_head = buf_seqno(buf); -			l_ptr->retransm_queue_size = retransmits; -			return; -		} +		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); +		buf = buf->next; +		retransmits--; +		l_ptr->stats.retransmitted++;  	}  	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; @@ -1676,7 +1685,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)  		ackd = msg_ack(msg);  		/* Release acked messages */ -		if (n_ptr->bclink.supported) +		if (n_ptr->bclink.recv_permitted)  			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));  		crs = l_ptr->first_out; @@ -1727,9 +1736,14 @@ deliver:  					tipc_link_recv_bundle(buf);  					continue;  				case NAME_DISTRIBUTOR: +					n_ptr->bclink.recv_permitted = true;  					tipc_node_unlock(n_ptr);  					tipc_named_recv(buf);  					continue; +				case BCAST_PROTOCOL: +					tipc_link_recv_sync(n_ptr, buf); +					tipc_node_unlock(n_ptr); +					continue;  				case CONN_MANAGER:  					tipc_node_unlock(n_ptr);  					tipc_port_recv_proto_msg(buf); @@ -1772,16 +1786,19 @@ deliver:  			continue;  		} +		/* Link is not in state WORKING_WORKING */  		if (msg_user(msg) == LINK_PROTOCOL) {  			link_recv_proto_msg(l_ptr, buf);  			head = link_insert_deferred_queue(l_ptr, head);  			tipc_node_unlock(n_ptr);  			continue;  		} + +		/* Traffic message. Conditionally activate link */  		link_state_event(l_ptr, TRAFFIC_MSG_EVT);  		if (link_working_working(l_ptr)) { -			/* Re-insert in front of queue */ +			/* Re-insert buffer in front of queue */  			buf->next = head;  			head = buf;  			tipc_node_unlock(n_ptr); @@ -1972,21 +1989,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,  	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); -	/* Defer message if bearer is already congested */ -	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { -		l_ptr->proto_msg_queue = buf; -		return; -	} - -	/* Defer message if attempting to send results in bearer congestion */ -	if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { -		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); +	/* Defer message if bearer is already blocked */ +	if (tipc_bearer_blocked(l_ptr->b_ptr)) {  		l_ptr->proto_msg_queue = buf; -		l_ptr->stats.bearer_congs++;  		return;  	} -	/* Discard message if it was sent successfully */ +	tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);  	l_ptr->unacked_window = 0;  	kfree_skb(buf);  } @@ -2057,7 +2066,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  		} else {  			l_ptr->max_pkt = l_ptr->max_pkt_target;  		} -		l_ptr->owner->bclink.supportable = (max_pkt_info != 0);  		/* Synchronize broadcast link info, if not done previously */  		if (!tipc_node_is_up(l_ptr->owner)) { @@ -2112,7 +2120,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  		}  		/* Protocol message before retransmits, reduce loss risk */ -		if (l_ptr->owner->bclink.supported) +		if (l_ptr->owner->bclink.recv_permitted)  			tipc_bclink_update_link_state(l_ptr->owner,  						      msg_last_bcast(msg)); @@ -2487,16 +2495,6 @@ static void set_expected_frags(struct sk_buff *buf, u32 exp)  	msg_set_bcast_ack(buf_msg(buf), exp);  } -static u32 get_timer_cnt(struct sk_buff *buf) -{ -	return msg_reroute_cnt(buf_msg(buf)); -} - -static void incr_timer_cnt(struct sk_buff *buf) -{ -	msg_incr_reroute_cnt(buf_msg(buf)); -} -  /*   * tipc_link_recv_fragment(): Called with node lock on. Returns   * the reassembled buffer if message is complete. @@ -2575,38 +2573,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,  	return 0;  } -/** - * link_check_defragm_bufs - flush stale incoming message fragments - * @l_ptr: pointer to link - */ -static void link_check_defragm_bufs(struct tipc_link *l_ptr) -{ -	struct sk_buff *prev = NULL; -	struct sk_buff *next = NULL; -	struct sk_buff *buf = l_ptr->defragm_buf; - -	if (!buf) -		return; -	if (!link_working_working(l_ptr)) -		return; -	while (buf) { -		u32 cnt = get_timer_cnt(buf); - -		next = buf->next; -		if (cnt < 4) { -			incr_timer_cnt(buf); -			prev = buf; -		} else { -			if (prev) -				prev->next = buf->next; -			else -				l_ptr->defragm_buf = buf->next; -			kfree_skb(buf); -		} -		buf = next; -	} -} -  static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)  {  	if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) @@ -2937,8 +2903,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)  			     s->sent_nacks, s->sent_acks, s->retransmitted);  	ret += tipc_snprintf(buf + ret, buf_size - ret, -			     "  Congestion bearer:%u link:%u  Send queue" -			     " max:%u avg:%u\n", s->bearer_congs, s->link_congs, +			     "  Congestion link:%u  Send queue" +			     " max:%u avg:%u\n", s->link_congs,  			     s->max_queue_sz, s->queue_sz_counts ?  			     (s->accu_queue_sz / s->queue_sz_counts) : 0); diff --git a/net/tipc/link.h b/net/tipc/link.h index 6e921121be06..c048ed1cbd76 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -40,9 +40,6 @@  #include "msg.h"  #include "node.h" -#define PUSH_FAILED   1 -#define PUSH_FINISHED 2 -  /*   * Out-of-range value for link sequence numbers   */ @@ -82,7 +79,6 @@ struct tipc_stats {  	u32 recv_fragmented;  	u32 recv_fragments;  	u32 link_congs;		/* # port sends blocked by congestion */ -	u32 bearer_congs;  	u32 deferred_recv;  	u32 duplicates;  	u32 max_queue_sz;	/* send queue size high water mark */ diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 55d3928dfd67..e0d08055754e 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -262,7 +262,7 @@ void tipc_named_node_up(unsigned long nodearg)  	named_distribute(&message_list, node, &publ_zone, max_item_buf);  	read_unlock_bh(&tipc_nametbl_lock); -	tipc_link_send_names(&message_list, (u32)node); +	tipc_link_send_names(&message_list, node);  }  /** diff --git a/net/tipc/node.c b/net/tipc/node.c index d21db204e25a..48f39dd3eae8 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1,7 +1,7 @@  /*   * net/tipc/node.c: TIPC node management routines   * - * Copyright (c) 2000-2006, Ericsson AB + * Copyright (c) 2000-2006, 2012 Ericsson AB   * Copyright (c) 2005-2006, 2010-2011, Wind River Systems   * All rights reserved.   * @@ -263,12 +263,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)  static void node_established_contact(struct tipc_node *n_ptr)  {  	tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); - -	if (n_ptr->bclink.supportable) { -		n_ptr->bclink.acked = tipc_bclink_get_last_sent(); -		tipc_bclink_add_node(n_ptr->addr); -		n_ptr->bclink.supported = 1; -	} +	n_ptr->bclink.oos_state = 0; +	n_ptr->bclink.acked = tipc_bclink_get_last_sent(); +	tipc_bclink_add_node(n_ptr->addr);  }  static void node_name_purge_complete(unsigned long node_addr) @@ -294,7 +291,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)  		tipc_addr_string_fill(addr_string, n_ptr->addr));  	/* Flush broadcast link info associated with lost node */ -	if (n_ptr->bclink.supported) { +	if (n_ptr->bclink.recv_permitted) {  		while (n_ptr->bclink.deferred_head) {  			struct sk_buff *buf = n_ptr->bclink.deferred_head;  			n_ptr->bclink.deferred_head = buf->next; @@ -310,7 +307,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)  		tipc_bclink_remove_node(n_ptr->addr);  		tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); -		n_ptr->bclink.supported = 0; +		n_ptr->bclink.recv_permitted = false;  	}  	/* Abort link changeover */ diff --git a/net/tipc/node.h b/net/tipc/node.h index cfcaf4d6e480..3c189b35b102 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -67,8 +67,6 @@   * @permit_changeover: non-zero if node has redundant links to this system   * @signature: node instance identifier   * @bclink: broadcast-related info - *    @supportable: non-zero if node supports TIPC b'cast link capability - *    @supported: non-zero if node supports TIPC b'cast capability   *    @acked: sequence # of last outbound b'cast message acknowledged by node   *    @last_in: sequence # of last in-sequence b'cast message received from node   *    @last_sent: sequence # of last b'cast message sent by node @@ -77,6 +75,7 @@   *    @deferred_head: oldest OOS b'cast message received from node   *    @deferred_tail: newest OOS b'cast message received from node   *    @defragm: list of partially reassembled b'cast message fragments from node + *    @recv_permitted: true if node is allowed to receive b'cast messages   */  struct tipc_node {  	u32 addr; @@ -92,8 +91,6 @@ struct tipc_node {  	int permit_changeover;  	u32 signature;  	struct { -		u8 supportable; -		u8 supported;  		u32 acked;  		u32 last_in;  		u32 last_sent; @@ -102,6 +99,7 @@ struct tipc_node {  		struct sk_buff *deferred_head;  		struct sk_buff *deferred_tail;  		struct sk_buff *defragm; +		bool recv_permitted;  	} bclink;  }; diff --git a/net/tipc/port.c b/net/tipc/port.c index 07c42fba672b..18098cac62f2 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -726,7 +726,7 @@ static void port_dispatcher_sigh(void *dummy)  				if (unlikely(!cb))  					goto reject;  				if (unlikely(!connected)) { -					if (tipc_connect2port(dref, &orig)) +					if (tipc_connect(dref, &orig))  						goto reject;  				} else if (peer_invalid)  					goto reject; @@ -1036,15 +1036,30 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)  	return res;  } -int tipc_connect2port(u32 ref, struct tipc_portid const *peer) +int tipc_connect(u32 ref, struct tipc_portid const *peer)  {  	struct tipc_port *p_ptr; -	struct tipc_msg *msg; -	int res = -EINVAL; +	int res;  	p_ptr = tipc_port_lock(ref);  	if (!p_ptr)  		return -EINVAL; +	res = __tipc_connect(ref, p_ptr, peer); +	tipc_port_unlock(p_ptr); +	return res; +} + +/* + * __tipc_connect - connect to a remote peer + * + * Port must be locked. + */ +int __tipc_connect(u32 ref, struct tipc_port *p_ptr, +			struct tipc_portid const *peer) +{ +	struct tipc_msg *msg; +	int res = -EINVAL; +  	if (p_ptr->published || p_ptr->connected)  		goto exit;  	if (!peer->ref) @@ -1067,17 +1082,16 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)  			  (net_ev_handler)port_handle_node_down);  	res = 0;  exit: -	tipc_port_unlock(p_ptr);  	p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);  	return res;  } -/** - * tipc_disconnect_port - disconnect port from peer +/* + * __tipc_disconnect - disconnect port from peer   *   * Port must be locked.   */ -int tipc_disconnect_port(struct tipc_port *tp_ptr) +int __tipc_disconnect(struct tipc_port *tp_ptr)  {  	int res; @@ -1104,7 +1118,7 @@ int tipc_disconnect(u32 ref)  	p_ptr = tipc_port_lock(ref);  	if (!p_ptr)  		return -EINVAL; -	res = tipc_disconnect_port(p_ptr); +	res = __tipc_disconnect(p_ptr);  	tipc_port_unlock(p_ptr);  	return res;  } diff --git a/net/tipc/port.h b/net/tipc/port.h index 4660e3065790..fb66e2e5f4d1 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -190,7 +190,7 @@ int tipc_publish(u32 portref, unsigned int scope,  int tipc_withdraw(u32 portref, unsigned int scope,  		struct tipc_name_seq const *name_seq); -int tipc_connect2port(u32 portref, struct tipc_portid const *port); +int tipc_connect(u32 portref, struct tipc_portid const *port);  int tipc_disconnect(u32 portref); @@ -200,7 +200,9 @@ int tipc_shutdown(u32 ref);  /*   * The following routines require that the port be locked on entry   */ -int tipc_disconnect_port(struct tipc_port *tp_ptr); +int __tipc_disconnect(struct tipc_port *tp_ptr); +int __tipc_connect(u32 ref, struct tipc_port *p_ptr, +		   struct tipc_portid const *peer);  int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);  /* diff --git a/net/tipc/socket.c b/net/tipc/socket.c index fd5f042dbff4..9b4e4833a484 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,8 +1,8 @@  /*   * net/tipc/socket.c: TIPC socket API   * - * Copyright (c) 2001-2007, Ericsson AB - * Copyright (c) 2004-2008, 2010-2011, Wind River Systems + * Copyright (c) 2001-2007, 2012 Ericsson AB + * Copyright (c) 2004-2008, 2010-2012, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -43,7 +43,7 @@  #define SS_LISTENING	-1	/* socket is listening */  #define SS_READY	-2	/* socket is connectionless */ -#define OVERLOAD_LIMIT_BASE	5000 +#define OVERLOAD_LIMIT_BASE	10000  #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */  struct tipc_sock { @@ -62,6 +62,8 @@ struct tipc_sock {  static int backlog_rcv(struct sock *sk, struct sk_buff *skb);  static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);  static void wakeupdispatch(struct tipc_port *tport); +static void tipc_data_ready(struct sock *sk, int len); +static void tipc_write_space(struct sock *sk);  static const struct proto_ops packet_ops;  static const struct proto_ops stream_ops; @@ -71,8 +73,6 @@ static struct proto tipc_proto;  static int sockets_enabled; -static atomic_t tipc_queue_size = ATOMIC_INIT(0); -  /*   * Revised TIPC socket locking policy:   * @@ -126,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);  static void advance_rx_queue(struct sock *sk)  {  	kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); -	atomic_dec(&tipc_queue_size);  }  /** @@ -138,10 +137,8 @@ static void discard_rx_queue(struct sock *sk)  {  	struct sk_buff *buf; -	while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { -		atomic_dec(&tipc_queue_size); +	while ((buf = __skb_dequeue(&sk->sk_receive_queue)))  		kfree_skb(buf); -	}  }  /** @@ -153,10 +150,8 @@ static void reject_rx_queue(struct sock *sk)  {  	struct sk_buff *buf; -	while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { +	while ((buf = __skb_dequeue(&sk->sk_receive_queue)))  		tipc_reject_msg(buf, TIPC_ERR_NO_PORT); -		atomic_dec(&tipc_queue_size); -	}  }  /** @@ -221,6 +216,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,  	sock_init_data(sock, sk);  	sk->sk_backlog_rcv = backlog_rcv;  	sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2; +	sk->sk_data_ready = tipc_data_ready; +	sk->sk_write_space = tipc_write_space;  	tipc_sk(sk)->p = tp_ptr;  	tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; @@ -276,7 +273,6 @@ static int release(struct socket *sock)  		buf = __skb_dequeue(&sk->sk_receive_queue);  		if (buf == NULL)  			break; -		atomic_dec(&tipc_queue_size);  		if (TIPC_SKB_CB(buf)->handle != 0)  			kfree_skb(buf);  		else { @@ -408,7 +404,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,   * socket state		flags set   * ------------		---------   * unconnected		no read flags - *			no write flags + *			POLLOUT if port is not congested   *   * connecting		POLLIN/POLLRDNORM if ACK/NACK in rx queue   *			no write flags @@ -435,9 +431,13 @@ static unsigned int poll(struct file *file, struct socket *sock,  	struct sock *sk = sock->sk;  	u32 mask = 0; -	poll_wait(file, sk_sleep(sk), wait); +	sock_poll_wait(file, sk_sleep(sk), wait);  	switch ((int)sock->state) { +	case SS_UNCONNECTED: +		if (!tipc_sk_port(sk)->congested) +			mask |= POLLOUT; +		break;  	case SS_READY:  	case SS_CONNECTED:  		if (!tipc_sk_port(sk)->congested) @@ -775,16 +775,19 @@ exit:  static int auto_connect(struct socket *sock, struct tipc_msg *msg)  {  	struct tipc_sock *tsock = tipc_sk(sock->sk); - -	if (msg_errcode(msg)) { -		sock->state = SS_DISCONNECTING; -		return -ECONNREFUSED; -	} +	struct tipc_port *p_ptr;  	tsock->peer_name.ref = msg_origport(msg);  	tsock->peer_name.node = msg_orignode(msg); -	tipc_connect2port(tsock->p->ref, &tsock->peer_name); -	tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); +	p_ptr = tipc_port_deref(tsock->p->ref); +	if (!p_ptr) +		return -EINVAL; + +	__tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name); + +	if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE) +		return -EINVAL; +	msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg));  	sock->state = SS_CONNECTED;  	return 0;  } @@ -943,13 +946,6 @@ restart:  	sz = msg_data_sz(msg);  	err = msg_errcode(msg); -	/* Complete connection setup for an implied connect */ -	if (unlikely(sock->state == SS_CONNECTING)) { -		res = auto_connect(sock, msg); -		if (res) -			goto exit; -	} -  	/* Discard an empty non-errored message & try again */  	if ((!sz) && (!err)) {  		advance_rx_queue(sk); @@ -1126,6 +1122,39 @@ exit:  }  /** + * tipc_write_space - wake up thread if port congestion is released + * @sk: socket + */ +static void tipc_write_space(struct sock *sk) +{ +	struct socket_wq *wq; + +	rcu_read_lock(); +	wq = rcu_dereference(sk->sk_wq); +	if (wq_has_sleeper(wq)) +		wake_up_interruptible_sync_poll(&wq->wait, POLLOUT | +						POLLWRNORM | POLLWRBAND); +	rcu_read_unlock(); +} + +/** + * tipc_data_ready - wake up threads to indicate messages have been received + * @sk: socket + * @len: the length of messages + */ +static void tipc_data_ready(struct sock *sk, int len) +{ +	struct socket_wq *wq; + +	rcu_read_lock(); +	wq = rcu_dereference(sk->sk_wq); +	if (wq_has_sleeper(wq)) +		wake_up_interruptible_sync_poll(&wq->wait, POLLIN | +						POLLRDNORM | POLLRDBAND); +	rcu_read_unlock(); +} + +/**   * rx_queue_full - determine if receive queue can accept another message   * @msg: message to be added to queue   * @queue_size: current size of queue @@ -1154,6 +1183,83 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)  }  /** + * filter_connect - Handle all incoming messages for a connection-based socket + * @tsock: TIPC socket + * @msg: message + * + * Returns TIPC error status code and socket error status code + * once it encounters some errors + */ +static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) +{ +	struct socket *sock = tsock->sk.sk_socket; +	struct tipc_msg *msg = buf_msg(*buf); +	struct sock *sk = &tsock->sk; +	u32 retval = TIPC_ERR_NO_PORT; +	int res; + +	if (msg_mcast(msg)) +		return retval; + +	switch ((int)sock->state) { +	case SS_CONNECTED: +		/* Accept only connection-based messages sent by peer */ +		if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) { +			if (unlikely(msg_errcode(msg))) { +				sock->state = SS_DISCONNECTING; +				__tipc_disconnect(tsock->p); +			} +			retval = TIPC_OK; +		} +		break; +	case SS_CONNECTING: +		/* Accept only ACK or NACK message */ +		if (unlikely(msg_errcode(msg))) { +			sock->state = SS_DISCONNECTING; +			sk->sk_err = -ECONNREFUSED; +			retval = TIPC_OK; +			break; +		} + +		if (unlikely(!msg_connected(msg))) +			break; + +		res = auto_connect(sock, msg); +		if (res) { +			sock->state = SS_DISCONNECTING; +			sk->sk_err = res; +			retval = TIPC_OK; +			break; +		} + +		/* If an incoming message is an 'ACK-', it should be +		 * discarded here because it doesn't contain useful +		 * data. In addition, we should try to wake up +		 * connect() routine if sleeping. +		 */ +		if (msg_data_sz(msg) == 0) { +			kfree_skb(*buf); +			*buf = NULL; +			if (waitqueue_active(sk_sleep(sk))) +				wake_up_interruptible(sk_sleep(sk)); +		} +		retval = TIPC_OK; +		break; +	case SS_LISTENING: +	case SS_UNCONNECTED: +		/* Accept only SYN message */ +		if (!msg_connected(msg) && !(msg_errcode(msg))) +			retval = TIPC_OK; +		break; +	case SS_DISCONNECTING: +		break; +	default: +		pr_err("Unknown socket state %u\n", sock->state); +	} +	return retval; +} + +/**   * filter_rcv - validate incoming message   * @sk: socket   * @buf: message @@ -1170,6 +1276,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)  	struct socket *sock = sk->sk_socket;  	struct tipc_msg *msg = buf_msg(buf);  	u32 recv_q_len; +	u32 res = TIPC_OK;  	/* Reject message if it is wrong sort of message for socket */  	if (msg_type(msg) > TIPC_DIRECT_MSG) @@ -1179,32 +1286,12 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)  		if (msg_connected(msg))  			return TIPC_ERR_NO_PORT;  	} else { -		if (msg_mcast(msg)) -			return TIPC_ERR_NO_PORT; -		if (sock->state == SS_CONNECTED) { -			if (!msg_connected(msg) || -			    !tipc_port_peer_msg(tipc_sk_port(sk), msg)) -				return TIPC_ERR_NO_PORT; -		} else if (sock->state == SS_CONNECTING) { -			if (!msg_connected(msg) && (msg_errcode(msg) == 0)) -				return TIPC_ERR_NO_PORT; -		} else if (sock->state == SS_LISTENING) { -			if (msg_connected(msg) || msg_errcode(msg)) -				return TIPC_ERR_NO_PORT; -		} else if (sock->state == SS_DISCONNECTING) { -			return TIPC_ERR_NO_PORT; -		} else /* (sock->state == SS_UNCONNECTED) */ { -			if (msg_connected(msg) || msg_errcode(msg)) -				return TIPC_ERR_NO_PORT; -		} +		res = filter_connect(tipc_sk(sk), &buf); +		if (res != TIPC_OK || buf == NULL) +			return res;  	}  	/* Reject message if there isn't room to queue it */ -	recv_q_len = (u32)atomic_read(&tipc_queue_size); -	if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) { -		if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE)) -			return TIPC_ERR_OVERLOAD; -	}  	recv_q_len = skb_queue_len(&sk->sk_receive_queue);  	if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {  		if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2)) @@ -1213,17 +1300,9 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)  	/* Enqueue message (finally!) */  	TIPC_SKB_CB(buf)->handle = 0; -	atomic_inc(&tipc_queue_size);  	__skb_queue_tail(&sk->sk_receive_queue, buf); -	/* Initiate connection termination for an incoming 'FIN' */ -	if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) { -		sock->state = SS_DISCONNECTING; -		tipc_disconnect_port(tipc_sk_port(sk)); -	} - -	if (waitqueue_active(sk_sleep(sk))) -		wake_up_interruptible(sk_sleep(sk)); +	sk->sk_data_ready(sk, 0);  	return TIPC_OK;  } @@ -1290,8 +1369,7 @@ static void wakeupdispatch(struct tipc_port *tport)  {  	struct sock *sk = (struct sock *)tport->usr_handle; -	if (waitqueue_active(sk_sleep(sk))) -		wake_up_interruptible(sk_sleep(sk)); +	sk->sk_write_space(sk);  }  /** @@ -1309,8 +1387,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,  	struct sock *sk = sock->sk;  	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;  	struct msghdr m = {NULL,}; -	struct sk_buff *buf; -	struct tipc_msg *msg;  	unsigned int timeout;  	int res; @@ -1322,26 +1398,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,  		goto exit;  	} -	/* For now, TIPC does not support the non-blocking form of connect() */ -	if (flags & O_NONBLOCK) { -		res = -EOPNOTSUPP; -		goto exit; -	} - -	/* Issue Posix-compliant error code if socket is in the wrong state */ -	if (sock->state == SS_LISTENING) { -		res = -EOPNOTSUPP; -		goto exit; -	} -	if (sock->state == SS_CONNECTING) { -		res = -EALREADY; -		goto exit; -	} -	if (sock->state != SS_UNCONNECTED) { -		res = -EISCONN; -		goto exit; -	} -  	/*  	 * Reject connection attempt using multicast address  	 * @@ -1353,49 +1409,66 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,  		goto exit;  	} -	/* Reject any messages already in receive queue (very unlikely) */ -	reject_rx_queue(sk); +	timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout; -	/* Send a 'SYN-' to destination */ -	m.msg_name = dest; -	m.msg_namelen = destlen; -	res = send_msg(NULL, sock, &m, 0); -	if (res < 0) +	switch (sock->state) { +	case SS_UNCONNECTED: +		/* Send a 'SYN-' to destination */ +		m.msg_name = dest; +		m.msg_namelen = destlen; + +		/* If connect is in non-blocking case, set MSG_DONTWAIT to +		 * indicate send_msg() is never blocked. +		 */ +		if (!timeout) +			m.msg_flags = MSG_DONTWAIT; + +		res = send_msg(NULL, sock, &m, 0); +		if ((res < 0) && (res != -EWOULDBLOCK)) +			goto exit; + +		/* Just entered SS_CONNECTING state; the only +		 * difference is that return value in non-blocking +		 * case is EINPROGRESS, rather than EALREADY. +		 */ +		res = -EINPROGRESS; +		break; +	case SS_CONNECTING: +		res = -EALREADY; +		break; +	case SS_CONNECTED: +		res = -EISCONN; +		break; +	default: +		res = -EINVAL;  		goto exit; +	} -	/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ -	timeout = tipc_sk(sk)->conn_timeout; -	release_sock(sk); -	res = wait_event_interruptible_timeout(*sk_sleep(sk), -			(!skb_queue_empty(&sk->sk_receive_queue) || -			(sock->state != SS_CONNECTING)), -			timeout ? (long)msecs_to_jiffies(timeout) -				: MAX_SCHEDULE_TIMEOUT); -	lock_sock(sk); +	if (sock->state == SS_CONNECTING) { +		if (!timeout) +			goto exit; -	if (res > 0) { -		buf = skb_peek(&sk->sk_receive_queue); -		if (buf != NULL) { -			msg = buf_msg(buf); -			res = auto_connect(sock, msg); -			if (!res) { -				if (!msg_data_sz(msg)) -					advance_rx_queue(sk); -			} -		} else { -			if (sock->state == SS_CONNECTED) -				res = -EISCONN; +		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ +		release_sock(sk); +		res = wait_event_interruptible_timeout(*sk_sleep(sk), +				sock->state != SS_CONNECTING, +				timeout ? (long)msecs_to_jiffies(timeout) +					: MAX_SCHEDULE_TIMEOUT); +		lock_sock(sk); +		if (res <= 0) { +			if (res == 0) +				res = -ETIMEDOUT;  			else -				res = -ECONNREFUSED; +				; /* leave "res" unchanged */ +			goto exit;  		} -	} else { -		if (res == 0) -			res = -ETIMEDOUT; -		else -			; /* leave "res" unchanged */ -		sock->state = SS_DISCONNECTING;  	} +	if (unlikely(sock->state == SS_DISCONNECTING)) +		res = sock_error(sk); +	else +		res = 0; +  exit:  	release_sock(sk);  	return res; @@ -1436,8 +1509,13 @@ static int listen(struct socket *sock, int len)   */  static int accept(struct socket *sock, struct socket *new_sock, int flags)  { -	struct sock *sk = sock->sk; +	struct sock *new_sk, *sk = sock->sk;  	struct sk_buff *buf; +	struct tipc_sock *new_tsock; +	struct tipc_port *new_tport; +	struct tipc_msg *msg; +	u32 new_ref; +  	int res;  	lock_sock(sk); @@ -1463,48 +1541,51 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)  	buf = skb_peek(&sk->sk_receive_queue);  	res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); -	if (!res) { -		struct sock *new_sk = new_sock->sk; -		struct tipc_sock *new_tsock = tipc_sk(new_sk); -		struct tipc_port *new_tport = new_tsock->p; -		u32 new_ref = new_tport->ref; -		struct tipc_msg *msg = buf_msg(buf); - -		lock_sock(new_sk); - -		/* -		 * Reject any stray messages received by new socket -		 * before the socket lock was taken (very, very unlikely) -		 */ -		reject_rx_queue(new_sk); - -		/* Connect new socket to it's peer */ -		new_tsock->peer_name.ref = msg_origport(msg); -		new_tsock->peer_name.node = msg_orignode(msg); -		tipc_connect2port(new_ref, &new_tsock->peer_name); -		new_sock->state = SS_CONNECTED; - -		tipc_set_portimportance(new_ref, msg_importance(msg)); -		if (msg_named(msg)) { -			new_tport->conn_type = msg_nametype(msg); -			new_tport->conn_instance = msg_nameinst(msg); -		} +	if (res) +		goto exit; -		/* -		 * Respond to 'SYN-' by discarding it & returning 'ACK'-. -		 * Respond to 'SYN+' by queuing it on new socket. -		 */ -		if (!msg_data_sz(msg)) { -			struct msghdr m = {NULL,}; +	new_sk = new_sock->sk; +	new_tsock = tipc_sk(new_sk); +	new_tport = new_tsock->p; +	new_ref = new_tport->ref; +	msg = buf_msg(buf); -			advance_rx_queue(sk); -			send_packet(NULL, new_sock, &m, 0); -		} else { -			__skb_dequeue(&sk->sk_receive_queue); -			__skb_queue_head(&new_sk->sk_receive_queue, buf); -		} -		release_sock(new_sk); +	/* we lock on new_sk; but lockdep sees the lock on sk */ +	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING); + +	/* +	 * Reject any stray messages received by new socket +	 * before the socket lock was taken (very, very unlikely) +	 */ +	reject_rx_queue(new_sk); + +	/* Connect new socket to it's peer */ +	new_tsock->peer_name.ref = msg_origport(msg); +	new_tsock->peer_name.node = msg_orignode(msg); +	tipc_connect(new_ref, &new_tsock->peer_name); +	new_sock->state = SS_CONNECTED; + +	tipc_set_portimportance(new_ref, msg_importance(msg)); +	if (msg_named(msg)) { +		new_tport->conn_type = msg_nametype(msg); +		new_tport->conn_instance = msg_nameinst(msg);  	} + +	/* +	 * Respond to 'SYN-' by discarding it & returning 'ACK'-. +	 * Respond to 'SYN+' by queuing it on new socket. +	 */ +	if (!msg_data_sz(msg)) { +		struct msghdr m = {NULL,}; + +		advance_rx_queue(sk); +		send_packet(NULL, new_sock, &m, 0); +	} else { +		__skb_dequeue(&sk->sk_receive_queue); +		__skb_queue_head(&new_sk->sk_receive_queue, buf); +	} +	release_sock(new_sk); +  exit:  	release_sock(sk);  	return res; @@ -1539,7 +1620,6 @@ restart:  		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */  		buf = __skb_dequeue(&sk->sk_receive_queue);  		if (buf) { -			atomic_dec(&tipc_queue_size);  			if (TIPC_SKB_CB(buf)->handle != 0) {  				kfree_skb(buf);  				goto restart; @@ -1556,10 +1636,11 @@ restart:  	case SS_DISCONNECTING: -		/* Discard any unreceived messages; wake up sleeping tasks */ +		/* Discard any unreceived messages */  		discard_rx_queue(sk); -		if (waitqueue_active(sk_sleep(sk))) -			wake_up_interruptible(sk_sleep(sk)); + +		/* Wake up anyone sleeping in poll */ +		sk->sk_state_change(sk);  		res = 0;  		break; @@ -1677,7 +1758,7 @@ static int getsockopt(struct socket *sock,  		/* no need to set "res", since already 0 at this point */  		break;  	case TIPC_NODE_RECVQ_DEPTH: -		value = (u32)atomic_read(&tipc_queue_size); +		value = 0; /* was tipc_queue_size, now obsolete */  		break;  	case TIPC_SOCK_RECVQ_DEPTH:  		value = skb_queue_len(&sk->sk_receive_queue); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 0f7d0d007e22..6b42d47029af 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -462,7 +462,7 @@ static void subscr_named_msg_event(void *usr_handle,  		kfree(subscriber);  		return;  	} -	tipc_connect2port(subscriber->port_ref, orig); +	tipc_connect(subscriber->port_ref, orig);  	/* Lock server port (& save lock address for future use) */  	subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; |