diff options
Diffstat (limited to 'net/tipc/bcast.c')
| -rw-r--r-- | net/tipc/bcast.c | 194 | 
1 files changed, 119 insertions, 75 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 95ab5ef92920..26631679a1fa 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -71,7 +71,7 @@ struct tipc_bcbearer_pair {   * Note: The fields labelled "temporary" are incorporated into the bearer   * to avoid consuming potentially limited stack space through the use of   * large local variables within multicast routines.  Concurrent access is - * prevented through use of the spinlock "bc_lock". + * prevented through use of the spinlock "bclink_lock".   */  struct tipc_bcbearer {  	struct tipc_bearer bearer; @@ -84,34 +84,64 @@ struct tipc_bcbearer {  /**   * struct tipc_bclink - link used for broadcast messages + * @lock: spinlock governing access to structure   * @link: (non-standard) broadcast link structure   * @node: (non-standard) node structure representing b'cast link's peer node + * @flags: represent bclink states   * @bcast_nodes: map of broadcast-capable nodes   * @retransmit_to: node that most recently requested a retransmit   *   * Handles sequence numbering, fragmentation, bundling, etc.   */  struct tipc_bclink { +	spinlock_t lock;  	struct tipc_link link;  	struct tipc_node node; +	unsigned int flags;  	struct tipc_node_map bcast_nodes;  	struct tipc_node *retransmit_to;  }; -static struct tipc_bcbearer bcast_bearer; -static struct tipc_bclink bcast_link; - -static struct tipc_bcbearer *bcbearer = &bcast_bearer; -static struct tipc_bclink *bclink = &bcast_link; -static struct tipc_link *bcl = &bcast_link.link; - -static DEFINE_SPINLOCK(bc_lock); +static struct tipc_bcbearer *bcbearer; +static struct tipc_bclink *bclink; +static struct tipc_link *bcl;  const char tipc_bclink_name[] = "broadcast-link";  static void tipc_nmap_diff(struct tipc_node_map *nm_a,  			   struct tipc_node_map *nm_b,  			   struct tipc_node_map *nm_diff); +static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); +static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); + +static void tipc_bclink_lock(void) +{ +	spin_lock_bh(&bclink->lock); +} + +static void tipc_bclink_unlock(void) +{ +	struct tipc_node *node = NULL; + +	if (likely(!bclink->flags)) { +		spin_unlock_bh(&bclink->lock); +		return; +	} + +	if (bclink->flags & TIPC_BCLINK_RESET) { +		bclink->flags &= ~TIPC_BCLINK_RESET; +		node = tipc_bclink_retransmit_to(); +	} +	spin_unlock_bh(&bclink->lock); + +	if (node) +		tipc_link_reset_all(node); +} + +void tipc_bclink_set_flags(unsigned int flags) +{ +	bclink->flags |= flags; +}  static u32 bcbuf_acks(struct sk_buff *buf)  { @@ -130,16 +160,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf)  void tipc_bclink_add_node(u32 addr)  { -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	tipc_nmap_add(&bclink->bcast_nodes, addr); -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  }  void tipc_bclink_remove_node(u32 addr)  { -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	tipc_nmap_remove(&bclink->bcast_nodes, addr); -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  }  static void bclink_set_last_sent(void) @@ -165,7 +195,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)  /**   * tipc_bclink_retransmit_to - get most recent node to request retransmission   * - * Called with bc_lock locked + * Called with bclink_lock locked   */  struct tipc_node *tipc_bclink_retransmit_to(void)  { @@ -177,7 +207,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void)   * @after: sequence number of last packet to *not* retransmit   * @to: sequence number of last packet to retransmit   * - * Called with bc_lock locked + * Called with bclink_lock locked   */  static void bclink_retransmit_pkt(u32 after, u32 to)  { @@ -194,7 +224,7 @@ static void bclink_retransmit_pkt(u32 after, u32 to)   * @n_ptr: node that sent acknowledgement info   * @acked: broadcast sequence # that has been acknowledged   * - * Node is locked, bc_lock unlocked. + * Node is locked, bclink_lock unlocked.   */  void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)  { @@ -202,8 +232,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)  	struct sk_buff *next;  	unsigned int released = 0; -	spin_lock_bh(&bc_lock); - +	tipc_bclink_lock();  	/* Bail out if tx queue is empty (no clean up is required) */  	crs = bcl->first_out;  	if (!crs) @@ -267,13 +296,13 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)  	if (unlikely(released && !list_empty(&bcl->waiting_ports)))  		tipc_link_wakeup_ports(bcl, 0);  exit: -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  }  /**   * tipc_bclink_update_link_state - update broadcast link state   * - * tipc_net_lock and node lock set + * RCU and node lock set   */  void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)  { @@ -320,10 +349,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)  				 ? buf_seqno(n_ptr->bclink.deferred_head) - 1  				 : n_ptr->bclink.last_sent); -		spin_lock_bh(&bc_lock); -		tipc_bearer_send(&bcbearer->bearer, buf, NULL); +		tipc_bclink_lock(); +		tipc_bearer_send(MAX_BEARERS, buf, NULL);  		bcl->stats.sent_nacks++; -		spin_unlock_bh(&bc_lock); +		tipc_bclink_unlock();  		kfree_skb(buf);  		n_ptr->bclink.oos_state++; @@ -335,8 +364,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)   *   * Delay any upcoming NACK by this node if another node has already   * requested the first message this node is going to ask for. - * - * Only tipc_net_lock set.   */  static void bclink_peek_nack(struct tipc_msg *msg)  { @@ -362,7 +389,7 @@ int tipc_bclink_xmit(struct sk_buff *buf)  {  	int res; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	if (!bclink->bcast_nodes.count) {  		res = msg_data_sz(buf_msg(buf)); @@ -377,14 +404,14 @@ int tipc_bclink_xmit(struct sk_buff *buf)  		bcl->stats.accu_queue_sz += bcl->out_queue_size;  	}  exit: -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  	return res;  }  /**   * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet   * - * Called with both sending node's lock and bc_lock taken. + * Called with both sending node's lock and bclink_lock taken.   */  static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)  { @@ -408,7 +435,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)  /**   * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards   * - * tipc_net_lock is read_locked, no other locks set + * RCU is locked, no other locks set   */  void tipc_bclink_rcv(struct sk_buff *buf)  { @@ -439,12 +466,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)  		if (msg_destnode(msg) == tipc_own_addr) {  			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));  			tipc_node_unlock(node); -			spin_lock_bh(&bc_lock); +			tipc_bclink_lock();  			bcl->stats.recv_nacks++;  			bclink->retransmit_to = node;  			bclink_retransmit_pkt(msg_bcgap_after(msg),  					      msg_bcgap_to(msg)); -			spin_unlock_bh(&bc_lock); +			tipc_bclink_unlock();  		} else {  			tipc_node_unlock(node);  			bclink_peek_nack(msg); @@ -462,51 +489,47 @@ receive:  		/* Deliver message to destination */  		if (likely(msg_isdata(msg))) { -			spin_lock_bh(&bc_lock); +			tipc_bclink_lock();  			bclink_accept_pkt(node, seqno); -			spin_unlock_bh(&bc_lock); +			tipc_bclink_unlock();  			tipc_node_unlock(node);  			if (likely(msg_mcast(msg)))  				tipc_port_mcast_rcv(buf, NULL);  			else  				kfree_skb(buf);  		} else if (msg_user(msg) == MSG_BUNDLER) { -			spin_lock_bh(&bc_lock); +			tipc_bclink_lock();  			bclink_accept_pkt(node, seqno);  			bcl->stats.recv_bundles++;  			bcl->stats.recv_bundled += msg_msgcnt(msg); -			spin_unlock_bh(&bc_lock); +			tipc_bclink_unlock();  			tipc_node_unlock(node);  			tipc_link_bundle_rcv(buf);  		} else if (msg_user(msg) == MSG_FRAGMENTER) { -			int ret; -			ret = tipc_link_frag_rcv(&node->bclink.reasm_head, -						 &node->bclink.reasm_tail, -						 &buf); -			if (ret == LINK_REASM_ERROR) +			tipc_buf_append(&node->bclink.reasm_buf, &buf); +			if (unlikely(!buf && !node->bclink.reasm_buf))  				goto unlock; -			spin_lock_bh(&bc_lock); +			tipc_bclink_lock();  			bclink_accept_pkt(node, seqno);  			bcl->stats.recv_fragments++; -			if (ret == LINK_REASM_COMPLETE) { +			if (buf) {  				bcl->stats.recv_fragmented++; -				/* Point msg to inner header */  				msg = buf_msg(buf); -				spin_unlock_bh(&bc_lock); +				tipc_bclink_unlock();  				goto receive;  			} -			spin_unlock_bh(&bc_lock); +			tipc_bclink_unlock();  			tipc_node_unlock(node);  		} else if (msg_user(msg) == NAME_DISTRIBUTOR) { -			spin_lock_bh(&bc_lock); +			tipc_bclink_lock();  			bclink_accept_pkt(node, seqno); -			spin_unlock_bh(&bc_lock); +			tipc_bclink_unlock();  			tipc_node_unlock(node);  			tipc_named_rcv(buf);  		} else { -			spin_lock_bh(&bc_lock); +			tipc_bclink_lock();  			bclink_accept_pkt(node, seqno); -			spin_unlock_bh(&bc_lock); +			tipc_bclink_unlock();  			tipc_node_unlock(node);  			kfree_skb(buf);  		} @@ -552,14 +575,14 @@ receive:  	} else  		deferred = 0; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	if (deferred)  		bcl->stats.deferred_recv++;  	else  		bcl->stats.duplicates++; -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  unlock:  	tipc_node_unlock(node); @@ -627,13 +650,13 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,  		if (bp_index == 0) {  			/* Use original buffer for first bearer */ -			tipc_bearer_send(b, buf, &b->bcast_addr); +			tipc_bearer_send(b->identity, buf, &b->bcast_addr);  		} else {  			/* Avoid concurrent buffer access */ -			tbuf = pskb_copy(buf, GFP_ATOMIC); +			tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);  			if (!tbuf)  				break; -			tipc_bearer_send(b, tbuf, &b->bcast_addr); +			tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);  			kfree_skb(tbuf); /* Bearer keeps a clone */  		} @@ -655,20 +678,27 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,  /**   * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer   */ -void tipc_bcbearer_sort(void) +void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)  {  	struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;  	struct tipc_bcbearer_pair *bp_curr; +	struct tipc_bearer *b;  	int b_index;  	int pri; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock(); + +	if (action) +		tipc_nmap_add(nm_ptr, node); +	else +		tipc_nmap_remove(nm_ptr, node);  	/* Group bearers by priority (can assume max of two per priority) */  	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); +	rcu_read_lock();  	for (b_index = 0; b_index < MAX_BEARERS; b_index++) { -		struct tipc_bearer *b = bearer_list[b_index]; +		b = rcu_dereference_rtnl(bearer_list[b_index]);  		if (!b || !b->nodes.count)  			continue; @@ -677,6 +707,7 @@ void tipc_bcbearer_sort(void)  		else  			bp_temp[b->priority].secondary = b;  	} +	rcu_read_unlock();  	/* Create array of bearer pairs for broadcasting */  	bp_curr = bcbearer->bpairs; @@ -702,7 +733,7 @@ void tipc_bcbearer_sort(void)  		bp_curr++;  	} -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  } @@ -714,7 +745,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)  	if (!bcl)  		return 0; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	s = &bcl->stats; @@ -743,7 +774,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)  			     s->queue_sz_counts ?  			     (s->accu_queue_sz / s->queue_sz_counts) : 0); -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  	return ret;  } @@ -752,9 +783,9 @@ int tipc_bclink_reset_stats(void)  	if (!bcl)  		return -ENOPROTOOPT; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	memset(&bcl->stats, 0, sizeof(bcl->stats)); -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  	return 0;  } @@ -765,46 +796,59 @@ int tipc_bclink_set_queue_limits(u32 limit)  	if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))  		return -EINVAL; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	tipc_link_set_queue_limits(bcl, limit); -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  	return 0;  } -void tipc_bclink_init(void) +int tipc_bclink_init(void)  { +	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); +	if (!bcbearer) +		return -ENOMEM; + +	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); +	if (!bclink) { +		kfree(bcbearer); +		return -ENOMEM; +	} + +	bcl = &bclink->link;  	bcbearer->bearer.media = &bcbearer->media;  	bcbearer->media.send_msg = tipc_bcbearer_send;  	sprintf(bcbearer->media.name, "tipc-broadcast"); +	spin_lock_init(&bclink->lock);  	INIT_LIST_HEAD(&bcl->waiting_ports);  	bcl->next_out_no = 1;  	spin_lock_init(&bclink->node.lock);  	bcl->owner = &bclink->node;  	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;  	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); -	bcl->b_ptr = &bcbearer->bearer; -	bearer_list[BCBEARER] = &bcbearer->bearer; +	bcl->bearer_id = MAX_BEARERS; +	rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);  	bcl->state = WORKING_WORKING;  	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); +	return 0;  }  void tipc_bclink_stop(void)  { -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	tipc_link_purge_queues(bcl); -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock(); -	bearer_list[BCBEARER] = NULL; -	memset(bclink, 0, sizeof(*bclink)); -	memset(bcbearer, 0, sizeof(*bcbearer)); +	RCU_INIT_POINTER(bearer_list[BCBEARER], NULL); +	synchronize_net(); +	kfree(bcbearer); +	kfree(bclink);  } -  /**   * tipc_nmap_add - add a node to a node map   */ -void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) +static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)  {  	int n = tipc_node(node);  	int w = n / WSIZE; @@ -819,7 +863,7 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)  /**   * tipc_nmap_remove - remove a node from a node map   */ -void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) +static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)  {  	int n = tipc_node(node);  	int w = n / WSIZE;  |