diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/bcast.c | 12 | ||||
| -rw-r--r-- | net/tipc/core.h | 1 | ||||
| -rw-r--r-- | net/tipc/group.c | 371 | ||||
| -rw-r--r-- | net/tipc/group.h | 10 | ||||
| -rw-r--r-- | net/tipc/link.c | 2 | ||||
| -rw-r--r-- | net/tipc/msg.c | 51 | ||||
| -rw-r--r-- | net/tipc/msg.h | 3 | ||||
| -rw-r--r-- | net/tipc/name_table.c | 57 | ||||
| -rw-r--r-- | net/tipc/name_table.h | 9 | ||||
| -rw-r--r-- | net/tipc/server.c | 80 | ||||
| -rw-r--r-- | net/tipc/server.h | 13 | ||||
| -rw-r--r-- | net/tipc/socket.c | 118 | ||||
| -rw-r--r-- | net/tipc/subscr.c | 35 | ||||
| -rw-r--r-- | net/tipc/subscr.h | 2 | 
14 files changed, 427 insertions, 337 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 329325bd553e..37892b3909af 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -1,7 +1,7 @@  /*   * net/tipc/bcast.c: TIPC broadcast code   * - * Copyright (c) 2004-2006, 2014-2016, Ericsson AB + * Copyright (c) 2004-2006, 2014-2017, Ericsson AB   * Copyright (c) 2004, Intel Corporation.   * Copyright (c) 2005, 2010-2011, Wind River Systems   * All rights reserved. @@ -42,8 +42,8 @@  #include "link.h"  #include "name_table.h" -#define	BCLINK_WIN_DEFAULT	50	/* bcast link window size (default) */ -#define	BCLINK_WIN_MIN	        32	/* bcast minimum link window size */ +#define BCLINK_WIN_DEFAULT  50	/* bcast link window size (default) */ +#define BCLINK_WIN_MIN      32	/* bcast minimum link window size */  const char tipc_bclink_name[] = "broadcast-link"; @@ -74,6 +74,10 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)  	return tipc_net(net)->bcbase;  } +/* tipc_bcast_get_mtu(): -get the MTU currently used by broadcast link + * Note: the MTU is decremented to give room for a tunnel header, in + * case the message needs to be sent as replicast + */  int tipc_bcast_get_mtu(struct net *net)  {  	return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; @@ -515,7 +519,7 @@ int tipc_bcast_init(struct net *net)  	spin_lock_init(&tipc_net(net)->bclock);  	if (!tipc_link_bc_create(net, 0, 0, -				 U16_MAX, +				 FB_MTU,  				 BCLINK_WIN_DEFAULT,  				 0,  				 &bb->inputq, diff --git a/net/tipc/core.h b/net/tipc/core.h index 964342689f2c..20b21af2ff14 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -49,7 +49,6 @@  #include <linux/uaccess.h>  #include <linux/interrupt.h>  #include <linux/atomic.h> -#include <asm/hardirq.h>  #include <linux/netdevice.h>  #include <linux/in.h>  #include <linux/list.h> diff --git a/net/tipc/group.c b/net/tipc/group.c index 5f4ffae807ee..122162a31816 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -49,8 +49,6 @@  #define ADV_ACTIVE (ADV_UNIT * 12)  enum mbr_state { -	MBR_QUARANTINED, -	MBR_DISCOVERED,  	MBR_JOINING,  	MBR_PUBLISHED,  	MBR_JOINED, @@ -64,8 +62,7 @@ enum mbr_state {  struct tipc_member {  	struct rb_node tree_node;  	struct list_head list; -	struct list_head congested; -	struct sk_buff *event_msg; +	struct list_head small_win;  	struct sk_buff_head deferredq;  	struct tipc_group *group;  	u32 node; @@ -77,21 +74,18 @@ struct tipc_member {  	u16 bc_rcv_nxt;  	u16 bc_syncpt;  	u16 bc_acked; -	bool usr_pending;  };  struct tipc_group {  	struct rb_root members; -	struct list_head congested; +	struct list_head small_win;  	struct list_head pending;  	struct list_head active; -	struct list_head reclaiming;  	struct tipc_nlist dests;  	struct net *net;  	int subid;  	u32 type;  	u32 instance; -	u32 domain;  	u32 scope;  	u32 portid;  	u16 member_cnt; @@ -99,6 +93,7 @@ struct tipc_group {  	u16 max_active;  	u16 bc_snd_nxt;  	u16 bc_ackers; +	bool *open;  	bool loopback;  	bool events;  }; @@ -106,6 +101,16 @@ struct tipc_group {  static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,  				  int mtyp, struct sk_buff_head *xmitq); +static void tipc_group_open(struct tipc_member *m, bool *wakeup) +{ +	*wakeup = false; +	if (list_empty(&m->small_win)) +		return; +	list_del_init(&m->small_win); +	*m->group->open = true; +	*wakeup = true; +} +  static void tipc_group_decr_active(struct tipc_group *grp,  				   struct tipc_member *m)  { @@ -137,14 +142,14 @@ u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)  	return grp->bc_snd_nxt;  } -static bool tipc_group_is_enabled(struct tipc_member *m) +static bool tipc_group_is_receiver(struct tipc_member *m)  { -	return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; +	return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;  } -static bool tipc_group_is_receiver(struct tipc_member *m) +static bool tipc_group_is_sender(struct tipc_member *m)  { -	return m && m->state >= MBR_JOINED; +	return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;  }  u32 tipc_group_exclude(struct tipc_group *grp) @@ -160,8 +165,11 @@ int tipc_group_size(struct tipc_group *grp)  }  struct tipc_group *tipc_group_create(struct net *net, u32 portid, -				     struct tipc_group_req *mreq) +				     struct tipc_group_req *mreq, +				     bool *group_is_open)  { +	u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; +	bool global = mreq->scope != TIPC_NODE_SCOPE;  	struct tipc_group *grp;  	u32 type = mreq->type; @@ -169,25 +177,41 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,  	if (!grp)  		return NULL;  	tipc_nlist_init(&grp->dests, tipc_own_addr(net)); -	INIT_LIST_HEAD(&grp->congested); +	INIT_LIST_HEAD(&grp->small_win);  	INIT_LIST_HEAD(&grp->active);  	INIT_LIST_HEAD(&grp->pending); -	INIT_LIST_HEAD(&grp->reclaiming);  	grp->members = RB_ROOT;  	grp->net = net;  	grp->portid = portid; -	grp->domain = addr_domain(net, mreq->scope);  	grp->type = type;  	grp->instance = mreq->instance;  	grp->scope = mreq->scope;  	grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;  	grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; -	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) +	grp->open = group_is_open; +	filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; +	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, +				    filter, &grp->subid))  		return grp;  	kfree(grp);  	return NULL;  } +void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) +{ +	struct rb_root *tree = &grp->members; +	struct tipc_member *m, *tmp; +	struct sk_buff_head xmitq; + +	skb_queue_head_init(&xmitq); +	rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { +		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); +		tipc_group_update_member(m, 0); +	} +	tipc_node_distr_xmit(net, &xmitq); +	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp); +} +  void tipc_group_delete(struct net *net, struct tipc_group *grp)  {  	struct rb_root *tree = &grp->members; @@ -233,7 +257,7 @@ static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,  	struct tipc_member *m;  	m = tipc_group_find_member(grp, node, port); -	if (m && tipc_group_is_enabled(m)) +	if (m && tipc_group_is_receiver(m))  		return m;  	return NULL;  } @@ -278,7 +302,7 @@ static void tipc_group_add_to_tree(struct tipc_group *grp,  static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,  						    u32 node, u32 port, -						    int state) +						    u32 instance, int state)  {  	struct tipc_member *m; @@ -286,11 +310,12 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,  	if (!m)  		return NULL;  	INIT_LIST_HEAD(&m->list); -	INIT_LIST_HEAD(&m->congested); +	INIT_LIST_HEAD(&m->small_win);  	__skb_queue_head_init(&m->deferredq);  	m->group = grp;  	m->node = node;  	m->port = port; +	m->instance = instance;  	m->bc_acked = grp->bc_snd_nxt - 1;  	grp->member_cnt++;  	tipc_group_add_to_tree(grp, m); @@ -299,9 +324,10 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,  	return m;  } -void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) +void tipc_group_add_member(struct tipc_group *grp, u32 node, +			   u32 port, u32 instance)  { -	tipc_group_create_member(grp, node, port, MBR_DISCOVERED); +	tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);  }  static void tipc_group_delete_member(struct tipc_group *grp, @@ -315,7 +341,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,  		grp->bc_ackers--;  	list_del_init(&m->list); -	list_del_init(&m->congested); +	list_del_init(&m->small_win);  	tipc_group_decr_active(grp, m);  	/* If last member on a node, remove node from dest list */ @@ -344,7 +370,7 @@ void tipc_group_update_member(struct tipc_member *m, int len)  	struct tipc_group *grp = m->group;  	struct tipc_member *_m, *tmp; -	if (!tipc_group_is_enabled(m)) +	if (!tipc_group_is_receiver(m))  		return;  	m->window -= len; @@ -352,16 +378,14 @@ void tipc_group_update_member(struct tipc_member *m, int len)  	if (m->window >= ADV_IDLE)  		return; -	list_del_init(&m->congested); +	list_del_init(&m->small_win); -	/* Sort member into congested members' list */ -	list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { -		if (m->window > _m->window) -			continue; -		list_add_tail(&m->congested, &_m->congested); -		return; +	/* Sort member into small_window members' list */ +	list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { +		if (_m->window > m->window) +			break;  	} -	list_add_tail(&m->congested, &grp->congested); +	list_add_tail(&m->small_win, &_m->small_win);  }  void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) @@ -373,7 +397,7 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)  	for (n = rb_first(&grp->members); n; n = rb_next(n)) {  		m = container_of(n, struct tipc_member, tree_node); -		if (tipc_group_is_enabled(m)) { +		if (tipc_group_is_receiver(m)) {  			tipc_group_update_member(m, len);  			m->bc_acked = prev;  			ackers++; @@ -394,20 +418,20 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,  	int adv, state;  	m = tipc_group_find_dest(grp, dnode, dport); -	*mbr = m; -	if (!m) +	if (!tipc_group_is_receiver(m)) { +		*mbr = NULL;  		return false; -	if (m->usr_pending) -		return true; +	} +	*mbr = m; +  	if (m->window >= len)  		return false; -	m->usr_pending = true; + +	*grp->open = false;  	/* If not fully advertised, do it now to prevent mutual blocking */  	adv = m->advertised;  	state = m->state; -	if (state < MBR_JOINED) -		return true;  	if (state == MBR_JOINED && adv == ADV_IDLE)  		return true;  	if (state == MBR_ACTIVE && adv == ADV_ACTIVE) @@ -425,13 +449,14 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)  	struct tipc_member *m = NULL;  	/* If prev bcast was replicast, reject until all receivers have acked */ -	if (grp->bc_ackers) +	if (grp->bc_ackers) { +		*grp->open = false;  		return true; - -	if (list_empty(&grp->congested)) +	} +	if (list_empty(&grp->small_win))  		return false; -	m = list_first_entry(&grp->congested, struct tipc_member, congested); +	m = list_first_entry(&grp->small_win, struct tipc_member, small_win);  	if (m->window >= len)  		return false; @@ -486,7 +511,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,  		goto drop;  	m = tipc_group_find_member(grp, node, port); -	if (!tipc_group_is_receiver(m)) +	if (!tipc_group_is_sender(m))  		goto drop;  	if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) @@ -573,24 +598,34 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  	switch (m->state) {  	case MBR_JOINED: -		/* Reclaim advertised space from least active member */ -		if (!list_empty(active) && active_cnt >= reclaim_limit) { +		/* First, decide if member can go active */ +		if (active_cnt <= max_active) { +			m->state = MBR_ACTIVE; +			list_add_tail(&m->list, active); +			grp->active_cnt++; +			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); +		} else { +			m->state = MBR_PENDING; +			list_add_tail(&m->list, &grp->pending); +		} + +		if (active_cnt < reclaim_limit) +			break; + +		/* Reclaim from oldest active member, if possible */ +		if (!list_empty(active)) {  			rm = list_first_entry(active, struct tipc_member, list);  			rm->state = MBR_RECLAIMING; -			list_move_tail(&rm->list, &grp->reclaiming); +			list_del_init(&rm->list);  			tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); -		} -		/* If max active, become pending and wait for reclaimed space */ -		if (active_cnt >= max_active) { -			m->state = MBR_PENDING; -			list_add_tail(&m->list, &grp->pending);  			break;  		} -		/* Otherwise become active */ -		m->state = MBR_ACTIVE; -		list_add_tail(&m->list, &grp->active); -		grp->active_cnt++; -		/* Fall through */ +		/* Nobody to reclaim from; - revert oldest pending to JOINED */ +		pm = list_first_entry(&grp->pending, struct tipc_member, list); +		list_del_init(&pm->list); +		pm->state = MBR_JOINED; +		tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); +		break;  	case MBR_ACTIVE:  		if (!list_is_last(&m->list, &grp->active))  			list_move_tail(&m->list, &grp->active); @@ -602,12 +637,12 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  		if (m->advertised > ADV_IDLE)  			break;  		m->state = MBR_JOINED; +		grp->active_cnt--;  		if (m->advertised < ADV_IDLE) {  			pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");  			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);  		} -		grp->active_cnt--; -		list_del_init(&m->list); +  		if (list_empty(&grp->pending))  			return; @@ -619,7 +654,6 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  		tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);  		break;  	case MBR_RECLAIMING: -	case MBR_DISCOVERED:  	case MBR_JOINING:  	case MBR_LEAVING:  	default: @@ -627,6 +661,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  	}  } +static void tipc_group_create_event(struct tipc_group *grp, +				    struct tipc_member *m, +				    u32 event, u16 seqno, +				    struct sk_buff_head *inputq) +{	u32 dnode = tipc_own_addr(grp->net); +	struct tipc_event evt; +	struct sk_buff *skb; +	struct tipc_msg *hdr; + +	evt.event = event; +	evt.found_lower = m->instance; +	evt.found_upper = m->instance; +	evt.port.ref = m->port; +	evt.port.node = m->node; +	evt.s.seq.type = grp->type; +	evt.s.seq.lower = m->instance; +	evt.s.seq.upper = m->instance; + +	skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, +			      GROUP_H_SIZE, sizeof(evt), dnode, m->node, +			      grp->portid, m->port, 0); +	if (!skb) +		return; + +	hdr = buf_msg(skb); +	msg_set_nametype(hdr, grp->type); +	msg_set_grp_evt(hdr, event); +	msg_set_dest_droppable(hdr, true); +	msg_set_grp_bc_seqno(hdr, seqno); +	memcpy(msg_data(hdr), &evt, sizeof(evt)); +	TIPC_SKB_CB(skb)->orig_member = m->instance; +	__skb_queue_tail(inputq, skb); +} +  static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,  				  int mtyp, struct sk_buff_head *xmitq)  { @@ -672,83 +740,73 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,  	u32 node = msg_orignode(hdr);  	u32 port = msg_origport(hdr);  	struct tipc_member *m, *pm; -	struct tipc_msg *ehdr;  	u16 remitted, in_flight;  	if (!grp)  		return; +	if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) +		return; +  	m = tipc_group_find_member(grp, node, port);  	switch (msg_type(hdr)) {  	case GRP_JOIN_MSG:  		if (!m)  			m = tipc_group_create_member(grp, node, port, -						     MBR_QUARANTINED); +						     0, MBR_JOINING);  		if (!m)  			return;  		m->bc_syncpt = msg_grp_bc_syncpt(hdr);  		m->bc_rcv_nxt = m->bc_syncpt;  		m->window += msg_adv_win(hdr); -		/* Wait until PUBLISH event is received */ -		if (m->state == MBR_DISCOVERED) { -			m->state = MBR_JOINING; -		} else if (m->state == MBR_PUBLISHED) { -			m->state = MBR_JOINED; -			*usr_wakeup = true; -			m->usr_pending = false; -			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); -			ehdr = buf_msg(m->event_msg); -			msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); -			__skb_queue_tail(inputq, m->event_msg); -		} -		list_del_init(&m->congested); +		/* Wait until PUBLISH event is received if necessary */ +		if (m->state != MBR_PUBLISHED) +			return; + +		/* Member can be taken into service */ +		m->state = MBR_JOINED; +		tipc_group_open(m, usr_wakeup);  		tipc_group_update_member(m, 0); +		tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); +		tipc_group_create_event(grp, m, TIPC_PUBLISHED, +					m->bc_syncpt, inputq);  		return;  	case GRP_LEAVE_MSG:  		if (!m)  			return;  		m->bc_syncpt = msg_grp_bc_syncpt(hdr);  		list_del_init(&m->list); -		list_del_init(&m->congested); -		*usr_wakeup = true; - -		/* Wait until WITHDRAW event is received */ -		if (m->state != MBR_LEAVING) { -			tipc_group_decr_active(grp, m); -			m->state = MBR_LEAVING; -			return; -		} -		/* Otherwise deliver already received WITHDRAW event */ -		ehdr = buf_msg(m->event_msg); -		msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); -		__skb_queue_tail(inputq, m->event_msg); +		tipc_group_open(m, usr_wakeup); +		tipc_group_decr_active(grp, m); +		m->state = MBR_LEAVING; +		tipc_group_create_event(grp, m, TIPC_WITHDRAWN, +					m->bc_syncpt, inputq);  		return;  	case GRP_ADV_MSG:  		if (!m)  			return;  		m->window += msg_adv_win(hdr); -		*usr_wakeup = m->usr_pending; -		m->usr_pending = false; -		list_del_init(&m->congested); +		tipc_group_open(m, usr_wakeup);  		return;  	case GRP_ACK_MSG:  		if (!m)  			return;  		m->bc_acked = msg_grp_bc_acked(hdr);  		if (--grp->bc_ackers) -			break; +			return; +		list_del_init(&m->small_win); +		*m->group->open = true;  		*usr_wakeup = true; -		m->usr_pending = false; +		tipc_group_update_member(m, 0);  		return;  	case GRP_RECLAIM_MSG:  		if (!m)  			return; -		*usr_wakeup = m->usr_pending; -		m->usr_pending = false;  		tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);  		m->window = ADV_IDLE; +		tipc_group_open(m, usr_wakeup);  		return;  	case GRP_REMIT_MSG:  		if (!m || m->state != MBR_RECLAIMING) @@ -763,18 +821,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,  			m->advertised = ADV_IDLE + in_flight;  			return;  		} -		/* All messages preceding the REMIT have been read */ -		if (m->advertised <= remitted) { -			m->state = MBR_JOINED; -			in_flight = 0; -		} -		/* ..and the REMIT overtaken by more messages => re-advertise */ +		/* This should never happen */  		if (m->advertised < remitted) -			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); +			pr_warn_ratelimited("Unexpected REMIT msg\n"); -		m->advertised = ADV_IDLE + in_flight; +		/* All messages preceding the REMIT have been read */ +		m->state = MBR_JOINED;  		grp->active_cnt--; -		list_del_init(&m->list); +		m->advertised = ADV_IDLE;  		/* Set oldest pending member to active and advertise */  		if (list_empty(&grp->pending)) @@ -796,11 +850,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,  void tipc_group_member_evt(struct tipc_group *grp,  			   bool *usr_wakeup,  			   int *sk_rcvbuf, -			   struct sk_buff *skb, +			   struct tipc_msg *hdr,  			   struct sk_buff_head *inputq,  			   struct sk_buff_head *xmitq)  { -	struct tipc_msg *hdr = buf_msg(skb);  	struct tipc_event *evt = (void *)msg_data(hdr);  	u32 instance = evt->found_lower;  	u32 node = evt->port.node; @@ -808,89 +861,59 @@ void tipc_group_member_evt(struct tipc_group *grp,  	int event = evt->event;  	struct tipc_member *m;  	struct net *net; -	bool node_up;  	u32 self;  	if (!grp) -		goto drop; +		return;  	net = grp->net;  	self = tipc_own_addr(net);  	if (!grp->loopback && node == self && port == grp->portid) -		goto drop; - -	/* Convert message before delivery to user */ -	msg_set_hdr_sz(hdr, GROUP_H_SIZE); -	msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); -	msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); -	msg_set_origport(hdr, port); -	msg_set_orignode(hdr, node); -	msg_set_nametype(hdr, grp->type); -	msg_set_grp_evt(hdr, event); +		return;  	m = tipc_group_find_member(grp, node, port); -	if (event == TIPC_PUBLISHED) { -		if (!m) -			m = tipc_group_create_member(grp, node, port, -						     MBR_DISCOVERED); -		if (!m) -			goto drop; - -		/* Hold back event if JOIN message not yet received */ -		if (m->state == MBR_DISCOVERED) { -			m->event_msg = skb; -			m->state = MBR_PUBLISHED; -		} else { -			msg_set_grp_bc_seqno(hdr, m->bc_syncpt); -			__skb_queue_tail(inputq, skb); -			m->state = MBR_JOINED; -			*usr_wakeup = true; -			m->usr_pending = false; +	switch (event) { +	case TIPC_PUBLISHED: +		/* Send and wait for arrival of JOIN message if necessary */ +		if (!m) { +			m = tipc_group_create_member(grp, node, port, instance, +						     MBR_PUBLISHED); +			if (!m) +				break; +			tipc_group_update_member(m, 0); +			tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); +			break;  		} + +		if (m->state != MBR_JOINING) +			break; + +		/* Member can be taken into service */  		m->instance = instance; -		TIPC_SKB_CB(skb)->orig_member = m->instance; +		m->state = MBR_JOINED; +		tipc_group_open(m, usr_wakeup); +		tipc_group_update_member(m, 0);  		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); -		if (m->window < ADV_IDLE) -			tipc_group_update_member(m, 0); -		else -			list_del_init(&m->congested); -	} else if (event == TIPC_WITHDRAWN) { +		tipc_group_create_event(grp, m, TIPC_PUBLISHED, +					m->bc_syncpt, inputq); +		break; +	case TIPC_WITHDRAWN:  		if (!m) -			goto drop; - -		TIPC_SKB_CB(skb)->orig_member = m->instance; +			break; -		*usr_wakeup = true; -		m->usr_pending = false; -		node_up = tipc_node_is_up(net, node); -		m->event_msg = NULL; - -		if (node_up) { -			/* Hold back event if a LEAVE msg should be expected */ -			if (m->state != MBR_LEAVING) { -				m->event_msg = skb; -				tipc_group_decr_active(grp, m); -				m->state = MBR_LEAVING; -			} else { -				msg_set_grp_bc_seqno(hdr, m->bc_syncpt); -				__skb_queue_tail(inputq, skb); -			} -		} else { -			if (m->state != MBR_LEAVING) { -				tipc_group_decr_active(grp, m); -				m->state = MBR_LEAVING; -				msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); -			} else { -				msg_set_grp_bc_seqno(hdr, m->bc_syncpt); -			} -			__skb_queue_tail(inputq, skb); -		} +		tipc_group_decr_active(grp, m); +		m->state = MBR_LEAVING;  		list_del_init(&m->list); -		list_del_init(&m->congested); +		tipc_group_open(m, usr_wakeup); + +		/* Only send event if no LEAVE message can be expected */ +		if (!tipc_node_is_up(net, node)) +			tipc_group_create_event(grp, m, TIPC_WITHDRAWN, +						m->bc_rcv_nxt, inputq); +		break; +	default: +		break;  	}  	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp); -	return; -drop: -	kfree_skb(skb);  } diff --git a/net/tipc/group.h b/net/tipc/group.h index d525e1cd7de5..5996af6e9f1d 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -43,9 +43,12 @@ struct tipc_member;  struct tipc_msg;  struct tipc_group *tipc_group_create(struct net *net, u32 portid, -				     struct tipc_group_req *mreq); +				     struct tipc_group_req *mreq, +				     bool *group_is_open); +void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcv_buf);  void tipc_group_delete(struct net *net, struct tipc_group *grp); -void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port); +void tipc_group_add_member(struct tipc_group *grp, u32 node, +			   u32 port, u32 instance);  struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);  void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,  		     int *scope); @@ -54,7 +57,7 @@ void tipc_group_filter_msg(struct tipc_group *grp,  			   struct sk_buff_head *inputq,  			   struct sk_buff_head *xmitq);  void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup, -			   int *sk_rcvbuf, struct sk_buff *skb, +			   int *sk_rcvbuf, struct tipc_msg *hdr,  			   struct sk_buff_head *inputq,  			   struct sk_buff_head *xmitq);  void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, @@ -69,5 +72,4 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  			       u32 port, struct sk_buff_head *xmitq);  u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);  void tipc_group_update_member(struct tipc_member *m, int len); -int tipc_group_size(struct tipc_group *grp);  #endif diff --git a/net/tipc/link.c b/net/tipc/link.c index 6bce0b1117bd..2d6b2aed30e0 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -483,7 +483,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,  /**   * tipc_link_bc_create - create new link to be used for broadcast   * @n: pointer to associated node - * @mtu: mtu to be used + * @mtu: mtu to be used initially if no peers   * @window: send window to be used   * @inputq: queue to put messages ready for delivery   * @namedq: queue to put binding table update messages ready for delivery diff --git a/net/tipc/msg.c b/net/tipc/msg.c index b0d07b35909d..55d8ba92291d 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -251,20 +251,23 @@ bool tipc_msg_validate(struct sk_buff **_skb)   * @pktmax: Max packet size that can be used   * @list: Buffer or chain of buffers to be returned to caller   * + * Note that the recursive call we are making here is safe, since it can + * logically go only one further level down. + *   * Returns message data size or errno: -ENOMEM, -EFAULT   */ -int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, -		   int offset, int dsz, int pktmax, struct sk_buff_head *list) +int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, +		   int dsz, int pktmax, struct sk_buff_head *list)  {  	int mhsz = msg_hdr_sz(mhdr); +	struct tipc_msg pkthdr;  	int msz = mhsz + dsz; -	int pktno = 1; -	int pktsz;  	int pktrem = pktmax; -	int drem = dsz; -	struct tipc_msg pkthdr;  	struct sk_buff *skb; +	int drem = dsz; +	int pktno = 1;  	char *pktpos; +	int pktsz;  	int rc;  	msg_set_size(mhdr, msz); @@ -272,8 +275,18 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,  	/* No fragmentation needed? */  	if (likely(msz <= pktmax)) {  		skb = tipc_buf_acquire(msz, GFP_KERNEL); -		if (unlikely(!skb)) + +		/* Fall back to smaller MTU if node local message */ +		if (unlikely(!skb)) { +			if (pktmax != MAX_MSG_SIZE) +				return -ENOMEM; +			rc = tipc_msg_build(mhdr, m, offset, dsz, FB_MTU, list); +			if (rc != dsz) +				return rc; +			if (tipc_msg_assemble(list)) +				return dsz;  			return -ENOMEM; +		}  		skb_orphan(skb);  		__skb_queue_tail(list, skb);  		skb_copy_to_linear_data(skb, mhdr, mhsz); @@ -589,6 +602,30 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)  	return true;  } +/* tipc_msg_assemble() - assemble chain of fragments into one message + */ +bool tipc_msg_assemble(struct sk_buff_head *list) +{ +	struct sk_buff *skb, *tmp = NULL; + +	if (skb_queue_len(list) == 1) +		return true; + +	while ((skb = __skb_dequeue(list))) { +		skb->next = NULL; +		if (tipc_buf_append(&tmp, &skb)) { +			__skb_queue_tail(list, skb); +			return true; +		} +		if (!tmp) +			break; +	} +	__skb_queue_purge(list); +	__skb_queue_head_init(list); +	pr_warn("Failed do assemble buffer\n"); +	return false; +} +  /* tipc_msg_reassemble() - clone a buffer chain of fragments and   *                         reassemble the clones into one message   */ diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 3e4384c222f7..b4ba1b4f9ae7 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -98,7 +98,7 @@ struct plist;  #define MAX_H_SIZE                60	/* Largest possible TIPC header size */  #define MAX_MSG_SIZE (MAX_H_SIZE + TIPC_MAX_USER_MSG_SIZE) - +#define FB_MTU                  3744  #define TIPC_MEDIA_INFO_OFFSET	5  struct tipc_skb_cb { @@ -943,6 +943,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);  int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,  		   int offset, int dsz, int mtu, struct sk_buff_head *list);  bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); +bool tipc_msg_assemble(struct sk_buff_head *list);  bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);  bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,  			struct sk_buff_head *cpy); diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index b3829bcf63c7..ed0457cc99d6 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -328,7 +328,8 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,  	list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {  		tipc_subscrp_report_overlap(s, publ->lower, publ->upper,  					    TIPC_PUBLISHED, publ->ref, -					    publ->node, created_subseq); +					    publ->node, publ->scope, +					    created_subseq);  	}  	return publ;  } @@ -398,19 +399,21 @@ found:  	list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {  		tipc_subscrp_report_overlap(s, publ->lower, publ->upper,  					    TIPC_WITHDRAWN, publ->ref, -					    publ->node, removed_subseq); +					    publ->node, publ->scope, +					    removed_subseq);  	}  	return publ;  }  /** - * tipc_nameseq_subscribe - attach a subscription, and issue - * the prescribed number of events if there is any sub- + * tipc_nameseq_subscribe - attach a subscription, and optionally + * issue the prescribed number of events if there is any sub-   * sequence overlapping with the requested sequence   */  static void tipc_nameseq_subscribe(struct name_seq *nseq, -				   struct tipc_subscription *s) +				   struct tipc_subscription *s, +				   bool status)  {  	struct sub_seq *sseq = nseq->sseqs;  	struct tipc_name_seq ns; @@ -420,7 +423,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,  	tipc_subscrp_get(s);  	list_add(&s->nameseq_list, &nseq->subscriptions); -	if (!sseq) +	if (!status || !sseq)  		return;  	while (sseq != &nseq->sseqs[nseq->first_free]) { @@ -434,6 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,  							    sseq->upper,  							    TIPC_PUBLISHED,  							    crs->ref, crs->node, +							    crs->scope,  							    must_report);  				must_report = 0;  			} @@ -597,7 +601,7 @@ not_found:  	return ref;  } -bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, +bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,  			 struct list_head *dsts, int *dstcnt, u32 exclude,  			 bool all)  { @@ -607,9 +611,6 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,  	struct name_seq *seq;  	struct sub_seq *sseq; -	if (!tipc_in_scope(domain, self)) -		return false; -  	*dstcnt = 0;  	rcu_read_lock();  	seq = nametbl_find_seq(net, type); @@ -620,7 +621,7 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,  	if (likely(sseq)) {  		info = sseq->info;  		list_for_each_entry(publ, &info->zone_list, zone_list) { -			if (!tipc_in_scope(domain, publ->node)) +			if (publ->scope != scope)  				continue;  			if (publ->ref == exclude && publ->node == self)  				continue; @@ -638,13 +639,14 @@ exit:  	return !list_empty(dsts);  } -int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, -			      u32 limit, struct list_head *dports) +int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper, +			   u32 scope, bool exact, struct list_head *dports)  { -	struct name_seq *seq; -	struct sub_seq *sseq;  	struct sub_seq *sseq_stop;  	struct name_info *info; +	struct publication *p; +	struct name_seq *seq; +	struct sub_seq *sseq;  	int res = 0;  	rcu_read_lock(); @@ -656,15 +658,12 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,  	sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);  	sseq_stop = seq->sseqs + seq->first_free;  	for (; sseq != sseq_stop; sseq++) { -		struct publication *publ; -  		if (sseq->lower > upper)  			break; -  		info = sseq->info; -		list_for_each_entry(publ, &info->node_list, node_list) { -			if (publ->scope <= limit) -				tipc_dest_push(dports, 0, publ->ref); +		list_for_each_entry(p, &info->node_list, node_list) { +			if (p->scope == scope || (!exact && p->scope < scope)) +				tipc_dest_push(dports, 0, p->ref);  		}  		if (info->cluster_list_size != info->node_list_size) @@ -681,8 +680,7 @@ exit:   * - Determines if any node local ports overlap   */  void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, -				   u32 upper, u32 domain, -				   struct tipc_nlist *nodes) +				   u32 upper, struct tipc_nlist *nodes)  {  	struct sub_seq *sseq, *stop;  	struct publication *publ; @@ -700,8 +698,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,  	for (; sseq != stop && sseq->lower <= upper; sseq++) {  		info = sseq->info;  		list_for_each_entry(publ, &info->zone_list, zone_list) { -			if (tipc_in_scope(domain, publ->node)) -				tipc_nlist_add(nodes, publ->node); +			tipc_nlist_add(nodes, publ->node);  		}  	}  	spin_unlock_bh(&seq->lock); @@ -712,7 +709,7 @@ exit:  /* tipc_nametbl_build_group - build list of communication group members   */  void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, -			      u32 type, u32 domain) +			      u32 type, u32 scope)  {  	struct sub_seq *sseq, *stop;  	struct name_info *info; @@ -730,9 +727,9 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,  	for (; sseq != stop; sseq++) {  		info = sseq->info;  		list_for_each_entry(p, &info->zone_list, zone_list) { -			if (!tipc_in_scope(domain, p->node)) +			if (p->scope != scope)  				continue; -			tipc_group_add_member(grp, p->node, p->ref); +			tipc_group_add_member(grp, p->node, p->ref, p->lower);  		}  	}  	spin_unlock_bh(&seq->lock); @@ -811,7 +808,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,  /**   * tipc_nametbl_subscribe - add a subscription object to the name table   */ -void tipc_nametbl_subscribe(struct tipc_subscription *s) +void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)  {  	struct tipc_net *tn = net_generic(s->net, tipc_net_id);  	u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); @@ -825,7 +822,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)  		seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);  	if (seq) {  		spin_lock_bh(&seq->lock); -		tipc_nameseq_subscribe(seq, s); +		tipc_nameseq_subscribe(seq, s, status);  		spin_unlock_bh(&seq->lock);  	} else {  		tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index 71926e429446..f56e7cb3d436 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -100,13 +100,12 @@ struct name_table {  int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);  u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); -int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, -			      u32 limit, struct list_head *dports); +int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper, +			   u32 scope, bool exact, struct list_head *dports);  void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,  			      u32 type, u32 domain);  void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, -				   u32 upper, u32 domain, -				   struct tipc_nlist *nodes); +				   u32 upper, struct tipc_nlist *nodes);  bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,  			 struct list_head *dsts, int *dstcnt, u32 exclude,  			 bool all); @@ -121,7 +120,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,  struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,  					     u32 lower, u32 node, u32 ref,  					     u32 key); -void tipc_nametbl_subscribe(struct tipc_subscription *s); +void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status);  void tipc_nametbl_unsubscribe(struct tipc_subscription *s);  int tipc_nametbl_init(struct net *net);  void tipc_nametbl_stop(struct net *net); diff --git a/net/tipc/server.c b/net/tipc/server.c index d60c30342327..df0c563c90cd 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -132,10 +132,11 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)  	spin_lock_bh(&s->idr_lock);  	con = idr_find(&s->conn_idr, conid); -	if (con && test_bit(CF_CONNECTED, &con->flags)) -		conn_get(con); -	else -		con = NULL; +	if (con) { +		if (!test_bit(CF_CONNECTED, &con->flags) || +		    !kref_get_unless_zero(&con->kref)) +			con = NULL; +	}  	spin_unlock_bh(&s->idr_lock);  	return con;  } @@ -183,35 +184,28 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)  	write_unlock_bh(&sk->sk_callback_lock);  } -static void tipc_unregister_callbacks(struct tipc_conn *con) -{ -	struct sock *sk = con->sock->sk; - -	write_lock_bh(&sk->sk_callback_lock); -	sk->sk_user_data = NULL; -	write_unlock_bh(&sk->sk_callback_lock); -} -  static void tipc_close_conn(struct tipc_conn *con)  {  	struct tipc_server *s = con->server; +	struct sock *sk = con->sock->sk; +	bool disconnect = false; -	if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { -		if (con->sock) -			tipc_unregister_callbacks(con); - +	write_lock_bh(&sk->sk_callback_lock); +	disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); +	if (disconnect) { +		sk->sk_user_data = NULL;  		if (con->conid)  			s->tipc_conn_release(con->conid, con->usr_data); - -		/* We shouldn't flush pending works as we may be in the -		 * thread. In fact the races with pending rx/tx work structs -		 * are harmless for us here as we have already deleted this -		 * connection from server connection list. -		 */ -		if (con->sock) -			kernel_sock_shutdown(con->sock, SHUT_RDWR); -		conn_put(con);  	} +	write_unlock_bh(&sk->sk_callback_lock); + +	/* Handle concurrent calls from sending and receiving threads */ +	if (!disconnect) +		return; + +	/* Don't flush pending works, -just let them expire */ +	kernel_sock_shutdown(con->sock, SHUT_RDWR); +	conn_put(con);  }  static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) @@ -248,9 +242,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)  static int tipc_receive_from_sock(struct tipc_conn *con)  { -	struct msghdr msg = {};  	struct tipc_server *s = con->server; +	struct sock *sk = con->sock->sk;  	struct sockaddr_tipc addr; +	struct msghdr msg = {};  	struct kvec iov;  	void *buf;  	int ret; @@ -264,19 +259,22 @@ static int tipc_receive_from_sock(struct tipc_conn *con)  	iov.iov_base = buf;  	iov.iov_len = s->max_rcvbuf_size;  	msg.msg_name = &addr; -	ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, -			     MSG_DONTWAIT); +	iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len); +	ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);  	if (ret <= 0) {  		kmem_cache_free(s->rcvbuf_cache, buf);  		goto out_close;  	} -	s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid, &addr, -			     con->usr_data, buf, ret); - +	read_lock_bh(&sk->sk_callback_lock); +	if (test_bit(CF_CONNECTED, &con->flags)) +		ret = s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid, +					   &addr, con->usr_data, buf, ret); +	read_unlock_bh(&sk->sk_callback_lock);  	kmem_cache_free(s->rcvbuf_cache, buf); - -	return 0; +	if (ret < 0) +		tipc_conn_terminate(s, con->conid); +	return ret;  out_close:  	if (ret != -EWOULDBLOCK) @@ -489,8 +487,8 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)  	}  } -bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, -			     u32 lower, u32 upper, int *conid) +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, +			     u32 upper, u32 filter, int *conid)  {  	struct tipc_subscriber *scbr;  	struct tipc_subscr sub; @@ -501,7 +499,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,  	sub.seq.lower = lower;  	sub.seq.upper = upper;  	sub.timeout = TIPC_WAIT_FOREVER; -	sub.filter = TIPC_SUB_PORTS; +	sub.filter = filter;  	*(u32 *)&sub.usr_handle = port;  	con = tipc_alloc_conn(tipc_topsrv(net)); @@ -525,11 +523,17 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,  void tipc_topsrv_kern_unsubscr(struct net *net, int conid)  {  	struct tipc_conn *con; +	struct tipc_server *srv;  	con = tipc_conn_lookup(tipc_topsrv(net), conid);  	if (!con)  		return; -	tipc_close_conn(con); + +	test_and_clear_bit(CF_CONNECTED, &con->flags); +	srv = con->server; +	if (con->conid) +		srv->tipc_conn_release(con->conid, con->usr_data); +	conn_put(con);  	conn_put(con);  } diff --git a/net/tipc/server.h b/net/tipc/server.h index 2113c9192633..64df7513cd70 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -41,6 +41,9 @@  #include <net/net_namespace.h>  #define TIPC_SERVER_NAME_LEN	32 +#define TIPC_SUB_CLUSTER_SCOPE  0x20 +#define TIPC_SUB_NODE_SCOPE     0x40 +#define TIPC_SUB_NO_STATUS      0x80  /**   * struct tipc_server - TIPC server structure @@ -71,9 +74,9 @@ struct tipc_server {  	int max_rcvbuf_size;  	void *(*tipc_conn_new)(int conid);  	void (*tipc_conn_release)(int conid, void *usr_data); -	void (*tipc_conn_recvmsg)(struct net *net, int conid, -				  struct sockaddr_tipc *addr, void *usr_data, -				  void *buf, size_t len); +	int (*tipc_conn_recvmsg)(struct net *net, int conid, +				 struct sockaddr_tipc *addr, void *usr_data, +				 void *buf, size_t len);  	struct sockaddr_tipc *saddr;  	char name[TIPC_SERVER_NAME_LEN];  	int imp; @@ -83,8 +86,8 @@ struct tipc_server {  int tipc_conn_sendmsg(struct tipc_server *s, int conid,  		      struct sockaddr_tipc *addr, void *data, size_t len); -bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, -			     u32 lower, u32 upper, int *conid); +bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, +			     u32 upper, u32 filter, int *conid);  void tipc_topsrv_kern_unsubscr(struct net *net, int conid);  /** diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 3b4084480377..163f3a547501 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -116,6 +116,7 @@ struct tipc_sock {  	struct tipc_mc_method mc_method;  	struct rcu_head rcu;  	struct tipc_group *group; +	bool group_is_open;  };  static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); @@ -710,13 +711,12 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,   * imply that the operation will succeed, merely that it should be performed   * and will not block.   */ -static unsigned int tipc_poll(struct file *file, struct socket *sock, +static __poll_t tipc_poll(struct file *file, struct socket *sock,  			      poll_table *wait)  {  	struct sock *sk = sock->sk;  	struct tipc_sock *tsk = tipc_sk(sk); -	struct tipc_group *grp = tsk->group; -	u32 revents = 0; +	__poll_t revents = 0;  	sock_poll_wait(file, sk_sleep(sk), wait); @@ -736,9 +736,8 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,  			revents |= POLLIN | POLLRDNORM;  		break;  	case TIPC_OPEN: -		if (!grp || tipc_group_size(grp)) -			if (!tsk->cong_link_cnt) -				revents |= POLLOUT; +		if (tsk->group_is_open && !tsk->cong_link_cnt) +			revents |= POLLOUT;  		if (!tipc_sk_type_connectionless(sk))  			break;  		if (skb_queue_empty(&sk->sk_receive_queue)) @@ -772,7 +771,6 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,  	struct net *net = sock_net(sk);  	int mtu = tipc_bcast_get_mtu(net);  	struct tipc_mc_method *method = &tsk->mc_method; -	u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);  	struct sk_buff_head pkts;  	struct tipc_nlist dsts;  	int rc; @@ -788,7 +786,7 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,  	/* Lookup destination nodes */  	tipc_nlist_init(&dsts, tipc_own_addr(net));  	tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower, -				      seq->upper, domain, &dsts); +				      seq->upper, &dsts);  	if (!dsts.local && !dsts.remote)  		return -EHOSTUNREACH; @@ -928,21 +926,22 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,  	struct list_head *cong_links = &tsk->cong_links;  	int blks = tsk_blocks(GROUP_H_SIZE + dlen);  	struct tipc_group *grp = tsk->group; +	struct tipc_msg *hdr = &tsk->phdr;  	struct tipc_member *first = NULL;  	struct tipc_member *mbr = NULL;  	struct net *net = sock_net(sk);  	u32 node, port, exclude; -	u32 type, inst, domain;  	struct list_head dsts; +	u32 type, inst, scope;  	int lookups = 0;  	int dstcnt, rc;  	bool cong;  	INIT_LIST_HEAD(&dsts); -	type = dest->addr.name.name.type; +	type = msg_nametype(hdr);  	inst = dest->addr.name.name.instance; -	domain = addr_domain(net, dest->scope); +	scope = msg_lookup_scope(hdr);  	exclude = tipc_group_exclude(grp);  	while (++lookups < 4) { @@ -950,7 +949,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,  		/* Look for a non-congested destination member, if any */  		while (1) { -			if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts, +			if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,  						 &dstcnt, exclude, false))  				return -EHOSTUNREACH;  			tipc_dest_pop(&dsts, &node, &port); @@ -1079,22 +1078,23 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,  {  	struct sock *sk = sock->sk;  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); -	struct tipc_name_seq *seq = &dest->addr.nameseq;  	struct tipc_sock *tsk = tipc_sk(sk);  	struct tipc_group *grp = tsk->group; +	struct tipc_msg *hdr = &tsk->phdr;  	struct net *net = sock_net(sk); -	u32 domain, exclude, dstcnt; +	u32 type, inst, scope, exclude;  	struct list_head dsts; +	u32 dstcnt;  	INIT_LIST_HEAD(&dsts); -	if (seq->lower != seq->upper) -		return -ENOTSUPP; - -	domain = addr_domain(net, dest->scope); +	type = msg_nametype(hdr); +	inst = dest->addr.name.name.instance; +	scope = msg_lookup_scope(hdr);  	exclude = tipc_group_exclude(grp); -	if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain, -				 &dsts, &dstcnt, exclude, true)) + +	if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts, +				 &dstcnt, exclude, true))  		return -EHOSTUNREACH;  	if (dstcnt == 1) { @@ -1116,24 +1116,29 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,  void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  		       struct sk_buff_head *inputq)  { -	u32 scope = TIPC_CLUSTER_SCOPE;  	u32 self = tipc_own_addr(net); +	u32 type, lower, upper, scope;  	struct sk_buff *skb, *_skb; -	u32 lower = 0, upper = ~0; -	struct sk_buff_head tmpq;  	u32 portid, oport, onode; +	struct sk_buff_head tmpq;  	struct list_head dports; -	struct tipc_msg *msg; -	int user, mtyp, hsz; +	struct tipc_msg *hdr; +	int user, mtyp, hlen; +	bool exact;  	__skb_queue_head_init(&tmpq);  	INIT_LIST_HEAD(&dports);  	skb = tipc_skb_peek(arrvq, &inputq->lock);  	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { -		msg = buf_msg(skb); -		user = msg_user(msg); -		mtyp = msg_type(msg); +		hdr = buf_msg(skb); +		user = msg_user(hdr); +		mtyp = msg_type(hdr); +		hlen = skb_headroom(skb) + msg_hdr_sz(hdr); +		oport = msg_origport(hdr); +		onode = msg_orignode(hdr); +		type = msg_nametype(hdr); +  		if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {  			spin_lock_bh(&inputq->lock);  			if (skb_peek(arrvq) == skb) { @@ -1144,21 +1149,31 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,  			spin_unlock_bh(&inputq->lock);  			continue;  		} -		hsz = skb_headroom(skb) + msg_hdr_sz(msg); -		oport = msg_origport(msg); -		onode = msg_orignode(msg); -		if (onode == self) -			scope = TIPC_NODE_SCOPE; - -		/* Create destination port list and message clones: */ -		if (!msg_in_group(msg)) { -			lower = msg_namelower(msg); -			upper = msg_nameupper(msg); + +		/* Group messages require exact scope match */ +		if (msg_in_group(hdr)) { +			lower = 0; +			upper = ~0; +			scope = msg_lookup_scope(hdr); +			exact = true; +		} else { +			/* TIPC_NODE_SCOPE means "any scope" in this context */ +			if (onode == self) +				scope = TIPC_NODE_SCOPE; +			else +				scope = TIPC_CLUSTER_SCOPE; +			exact = false; +			lower = msg_namelower(hdr); +			upper = msg_nameupper(hdr);  		} -		tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper, -					  scope, &dports); + +		/* Create destination port list: */ +		tipc_nametbl_mc_lookup(net, type, lower, upper, +				       scope, exact, &dports); + +		/* Clone message per destination */  		while (tipc_dest_pop(&dports, NULL, &portid)) { -			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC); +			_skb = __pskb_copy(skb, hlen, GFP_ATOMIC);  			if (_skb) {  				msg_set_destport(buf_msg(_skb), portid);  				__skb_queue_tail(&tmpq, _skb); @@ -1933,8 +1948,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,  		break;  	case TOP_SRV:  		tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf, -				      skb, inputq, xmitq); -		skb = NULL; +				      hdr, inputq, xmitq);  		break;  	default:  		break; @@ -2640,9 +2654,7 @@ void tipc_sk_reinit(struct net *net)  	rhashtable_walk_enter(&tn->sk_rht, &iter);  	do { -		tsk = ERR_PTR(rhashtable_walk_start(&iter)); -		if (IS_ERR(tsk)) -			goto walk_stop; +		rhashtable_walk_start(&iter);  		while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {  			spin_lock_bh(&tsk->sk.sk_lock.slock); @@ -2651,7 +2663,7 @@ void tipc_sk_reinit(struct net *net)  			msg_set_orignode(msg, tn->own_addr);  			spin_unlock_bh(&tsk->sk.sk_lock.slock);  		} -walk_stop: +  		rhashtable_walk_stop(&iter);  	} while (tsk == ERR_PTR(-EAGAIN));  } @@ -2734,7 +2746,6 @@ void tipc_sk_rht_destroy(struct net *net)  static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)  {  	struct net *net = sock_net(&tsk->sk); -	u32 domain = addr_domain(net, mreq->scope);  	struct tipc_group *grp = tsk->group;  	struct tipc_msg *hdr = &tsk->phdr;  	struct tipc_name_seq seq; @@ -2742,9 +2753,11 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)  	if (mreq->type < TIPC_RESERVED_TYPES)  		return -EACCES; +	if (mreq->scope > TIPC_NODE_SCOPE) +		return -EINVAL;  	if (grp)  		return -EACCES; -	grp = tipc_group_create(net, tsk->portid, mreq); +	grp = tipc_group_create(net, tsk->portid, mreq, &tsk->group_is_open);  	if (!grp)  		return -ENOMEM;  	tsk->group = grp; @@ -2754,16 +2767,17 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)  	seq.type = mreq->type;  	seq.lower = mreq->instance;  	seq.upper = seq.lower; -	tipc_nametbl_build_group(net, grp, mreq->type, domain); +	tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);  	rc = tipc_sk_publish(tsk, mreq->scope, &seq);  	if (rc) {  		tipc_group_delete(net, grp);  		tsk->group = NULL; +		return rc;  	} - -	/* Eliminate any risk that a broadcast overtakes the sent JOIN */ +	/* Eliminate any risk that a broadcast overtakes sent JOINs */  	tsk->mc_method.rcast = true;  	tsk->mc_method.mandatory = true; +	tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);  	return rc;  } diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 251065dfd8df..68e26470c516 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -118,15 +118,19 @@ void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,  void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,  				 u32 found_upper, u32 event, u32 port_ref, -				 u32 node, int must) +				 u32 node, u32 scope, int must)  { +	u32 filter = htohl(sub->evt.s.filter, sub->swap);  	struct tipc_name_seq seq;  	tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq);  	if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper))  		return; -	if (!must && -	    !(htohl(sub->evt.s.filter, sub->swap) & TIPC_SUB_PORTS)) +	if (!must && !(filter & TIPC_SUB_PORTS)) +		return; +	if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE) +		return; +	if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)  		return;  	tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, @@ -285,21 +289,21 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,  	return sub;  } -static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, -				   struct tipc_subscriber *subscriber, int swap) +static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, +				  struct tipc_subscriber *subscriber, int swap, +				  bool status)  { -	struct tipc_net *tn = net_generic(net, tipc_net_id);  	struct tipc_subscription *sub = NULL;  	u32 timeout;  	sub = tipc_subscrp_create(net, s, swap);  	if (!sub) -		return tipc_conn_terminate(tn->topsrv, subscriber->conid); +		return -1;  	spin_lock_bh(&subscriber->lock);  	list_add(&sub->subscrp_list, &subscriber->subscrp_list);  	sub->subscriber = subscriber; -	tipc_nametbl_subscribe(sub); +	tipc_nametbl_subscribe(sub, status);  	tipc_subscrb_get(subscriber);  	spin_unlock_bh(&subscriber->lock); @@ -308,6 +312,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,  	if (timeout != TIPC_WAIT_FOREVER)  		mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); +	return 0;  }  /* Handle one termination request for the subscriber */ @@ -317,12 +322,13 @@ static void tipc_subscrb_release_cb(int conid, void *usr_data)  }  /* Handle one request to create a new subscription for the subscriber */ -static void tipc_subscrb_rcv_cb(struct net *net, int conid, -				struct sockaddr_tipc *addr, void *usr_data, -				void *buf, size_t len) +static int tipc_subscrb_rcv_cb(struct net *net, int conid, +			       struct sockaddr_tipc *addr, void *usr_data, +			       void *buf, size_t len)  {  	struct tipc_subscriber *subscriber = usr_data;  	struct tipc_subscr *s = (struct tipc_subscr *)buf; +	bool status;  	int swap;  	/* Determine subscriber's endianness */ @@ -332,10 +338,11 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,  	/* Detect & process a subscription cancellation request */  	if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {  		s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); -		return tipc_subscrp_cancel(s, subscriber); +		tipc_subscrp_cancel(s, subscriber); +		return 0;  	} - -	tipc_subscrp_subscribe(net, s, subscriber, swap); +	status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap)); +	return tipc_subscrp_subscribe(net, s, subscriber, swap, status);  }  /* Handle one request to establish a new subscriber */ diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index ee52957dc952..f3edca775d9f 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -71,7 +71,7 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,  			       u32 found_upper);  void tipc_subscrp_report_overlap(struct tipc_subscription *sub,  				 u32 found_lower, u32 found_upper, u32 event, -				 u32 port_ref, u32 node, int must); +				 u32 port_ref, u32 node, u32 scope, int must);  void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,  			      struct tipc_name_seq *out);  u32 tipc_subscrp_convert_seq_type(u32 type, int swap);  |