diff options
Diffstat (limited to 'net/tipc/group.c')
| -rw-r--r-- | net/tipc/group.c | 371 | 
1 files changed, 197 insertions, 174 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c index 5f4ffae807ee..122162a31816 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -49,8 +49,6 @@  #define ADV_ACTIVE (ADV_UNIT * 12)  enum mbr_state { -	MBR_QUARANTINED, -	MBR_DISCOVERED,  	MBR_JOINING,  	MBR_PUBLISHED,  	MBR_JOINED, @@ -64,8 +62,7 @@ enum mbr_state {  struct tipc_member {  	struct rb_node tree_node;  	struct list_head list; -	struct list_head congested; -	struct sk_buff *event_msg; +	struct list_head small_win;  	struct sk_buff_head deferredq;  	struct tipc_group *group;  	u32 node; @@ -77,21 +74,18 @@ struct tipc_member {  	u16 bc_rcv_nxt;  	u16 bc_syncpt;  	u16 bc_acked; -	bool usr_pending;  };  struct tipc_group {  	struct rb_root members; -	struct list_head congested; +	struct list_head small_win;  	struct list_head pending;  	struct list_head active; -	struct list_head reclaiming;  	struct tipc_nlist dests;  	struct net *net;  	int subid;  	u32 type;  	u32 instance; -	u32 domain;  	u32 scope;  	u32 portid;  	u16 member_cnt; @@ -99,6 +93,7 @@ struct tipc_group {  	u16 max_active;  	u16 bc_snd_nxt;  	u16 bc_ackers; +	bool *open;  	bool loopback;  	bool events;  }; @@ -106,6 +101,16 @@ struct tipc_group {  static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,  				  int mtyp, struct sk_buff_head *xmitq); +static void tipc_group_open(struct tipc_member *m, bool *wakeup) +{ +	*wakeup = false; +	if (list_empty(&m->small_win)) +		return; +	list_del_init(&m->small_win); +	*m->group->open = true; +	*wakeup = true; +} +  static void tipc_group_decr_active(struct tipc_group *grp,  				   struct tipc_member *m)  { @@ -137,14 +142,14 @@ u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)  	return grp->bc_snd_nxt;  } -static bool tipc_group_is_enabled(struct tipc_member *m) +static bool tipc_group_is_receiver(struct tipc_member *m)  { -	return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; +	return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;  } -static bool tipc_group_is_receiver(struct tipc_member *m) +static bool tipc_group_is_sender(struct tipc_member *m)  { -	return m && m->state >= MBR_JOINED; +	return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;  }  u32 tipc_group_exclude(struct tipc_group *grp) @@ -160,8 +165,11 @@ int tipc_group_size(struct tipc_group *grp)  }  struct tipc_group *tipc_group_create(struct net *net, u32 portid, -				     struct tipc_group_req *mreq) +				     struct tipc_group_req *mreq, +				     bool *group_is_open)  { +	u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; +	bool global = mreq->scope != TIPC_NODE_SCOPE;  	struct tipc_group *grp;  	u32 type = mreq->type; @@ -169,25 +177,41 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,  	if (!grp)  		return NULL;  	tipc_nlist_init(&grp->dests, tipc_own_addr(net)); -	INIT_LIST_HEAD(&grp->congested); +	INIT_LIST_HEAD(&grp->small_win);  	INIT_LIST_HEAD(&grp->active);  	INIT_LIST_HEAD(&grp->pending); -	INIT_LIST_HEAD(&grp->reclaiming);  	grp->members = RB_ROOT;  	grp->net = net;  	grp->portid = portid; -	grp->domain = addr_domain(net, mreq->scope);  	grp->type = type;  	grp->instance = mreq->instance;  	grp->scope = mreq->scope;  	grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;  	grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; -	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) +	grp->open = group_is_open; +	filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; +	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, +				    filter, &grp->subid))  		return grp;  	kfree(grp);  	return NULL;  } +void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) +{ +	struct rb_root *tree = &grp->members; +	struct tipc_member *m, *tmp; +	struct sk_buff_head xmitq; + +	skb_queue_head_init(&xmitq); +	rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { +		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); +		tipc_group_update_member(m, 0); +	} +	tipc_node_distr_xmit(net, &xmitq); +	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp); +} +  void tipc_group_delete(struct net *net, struct tipc_group *grp)  {  	struct rb_root *tree = &grp->members; @@ -233,7 +257,7 @@ static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,  	struct tipc_member *m;  	m = tipc_group_find_member(grp, node, port); -	if (m && tipc_group_is_enabled(m)) +	if (m && tipc_group_is_receiver(m))  		return m;  	return NULL;  } @@ -278,7 +302,7 @@ static void tipc_group_add_to_tree(struct tipc_group *grp,  static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,  						    u32 node, u32 port, -						    int state) +						    u32 instance, int state)  {  	struct tipc_member *m; @@ -286,11 +310,12 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,  	if (!m)  		return NULL;  	INIT_LIST_HEAD(&m->list); -	INIT_LIST_HEAD(&m->congested); +	INIT_LIST_HEAD(&m->small_win);  	__skb_queue_head_init(&m->deferredq);  	m->group = grp;  	m->node = node;  	m->port = port; +	m->instance = instance;  	m->bc_acked = grp->bc_snd_nxt - 1;  	grp->member_cnt++;  	tipc_group_add_to_tree(grp, m); @@ -299,9 +324,10 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,  	return m;  } -void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) +void tipc_group_add_member(struct tipc_group *grp, u32 node, +			   u32 port, u32 instance)  { -	tipc_group_create_member(grp, node, port, MBR_DISCOVERED); +	tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);  }  static void tipc_group_delete_member(struct tipc_group *grp, @@ -315,7 +341,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,  		grp->bc_ackers--;  	list_del_init(&m->list); -	list_del_init(&m->congested); +	list_del_init(&m->small_win);  	tipc_group_decr_active(grp, m);  	/* If last member on a node, remove node from dest list */ @@ -344,7 +370,7 @@ void tipc_group_update_member(struct tipc_member *m, int len)  	struct tipc_group *grp = m->group;  	struct tipc_member *_m, *tmp; -	if (!tipc_group_is_enabled(m)) +	if (!tipc_group_is_receiver(m))  		return;  	m->window -= len; @@ -352,16 +378,14 @@ void tipc_group_update_member(struct tipc_member *m, int len)  	if (m->window >= ADV_IDLE)  		return; -	list_del_init(&m->congested); +	list_del_init(&m->small_win); -	/* Sort member into congested members' list */ -	list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { -		if (m->window > _m->window) -			continue; -		list_add_tail(&m->congested, &_m->congested); -		return; +	/* Sort member into small_window members' list */ +	list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { +		if (_m->window > m->window) +			break;  	} -	list_add_tail(&m->congested, &grp->congested); +	list_add_tail(&m->small_win, &_m->small_win);  }  void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) @@ -373,7 +397,7 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)  	for (n = rb_first(&grp->members); n; n = rb_next(n)) {  		m = container_of(n, struct tipc_member, tree_node); -		if (tipc_group_is_enabled(m)) { +		if (tipc_group_is_receiver(m)) {  			tipc_group_update_member(m, len);  			m->bc_acked = prev;  			ackers++; @@ -394,20 +418,20 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,  	int adv, state;  	m = tipc_group_find_dest(grp, dnode, dport); -	*mbr = m; -	if (!m) +	if (!tipc_group_is_receiver(m)) { +		*mbr = NULL;  		return false; -	if (m->usr_pending) -		return true; +	} +	*mbr = m; +  	if (m->window >= len)  		return false; -	m->usr_pending = true; + +	*grp->open = false;  	/* If not fully advertised, do it now to prevent mutual blocking */  	adv = m->advertised;  	state = m->state; -	if (state < MBR_JOINED) -		return true;  	if (state == MBR_JOINED && adv == ADV_IDLE)  		return true;  	if (state == MBR_ACTIVE && adv == ADV_ACTIVE) @@ -425,13 +449,14 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)  	struct tipc_member *m = NULL;  	/* If prev bcast was replicast, reject until all receivers have acked */ -	if (grp->bc_ackers) +	if (grp->bc_ackers) { +		*grp->open = false;  		return true; - -	if (list_empty(&grp->congested)) +	} +	if (list_empty(&grp->small_win))  		return false; -	m = list_first_entry(&grp->congested, struct tipc_member, congested); +	m = list_first_entry(&grp->small_win, struct tipc_member, small_win);  	if (m->window >= len)  		return false; @@ -486,7 +511,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,  		goto drop;  	m = tipc_group_find_member(grp, node, port); -	if (!tipc_group_is_receiver(m)) +	if (!tipc_group_is_sender(m))  		goto drop;  	if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) @@ -573,24 +598,34 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  	switch (m->state) {  	case MBR_JOINED: -		/* Reclaim advertised space from least active member */ -		if (!list_empty(active) && active_cnt >= reclaim_limit) { +		/* First, decide if member can go active */ +		if (active_cnt <= max_active) { +			m->state = MBR_ACTIVE; +			list_add_tail(&m->list, active); +			grp->active_cnt++; +			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); +		} else { +			m->state = MBR_PENDING; +			list_add_tail(&m->list, &grp->pending); +		} + +		if (active_cnt < reclaim_limit) +			break; + +		/* Reclaim from oldest active member, if possible */ +		if (!list_empty(active)) {  			rm = list_first_entry(active, struct tipc_member, list);  			rm->state = MBR_RECLAIMING; -			list_move_tail(&rm->list, &grp->reclaiming); +			list_del_init(&rm->list);  			tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); -		} -		/* If max active, become pending and wait for reclaimed space */ -		if (active_cnt >= max_active) { -			m->state = MBR_PENDING; -			list_add_tail(&m->list, &grp->pending);  			break;  		} -		/* Otherwise become active */ -		m->state = MBR_ACTIVE; -		list_add_tail(&m->list, &grp->active); -		grp->active_cnt++; -		/* Fall through */ +		/* Nobody to reclaim from; - revert oldest pending to JOINED */ +		pm = list_first_entry(&grp->pending, struct tipc_member, list); +		list_del_init(&pm->list); +		pm->state = MBR_JOINED; +		tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); +		break;  	case MBR_ACTIVE:  		if (!list_is_last(&m->list, &grp->active))  			list_move_tail(&m->list, &grp->active); @@ -602,12 +637,12 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  		if (m->advertised > ADV_IDLE)  			break;  		m->state = MBR_JOINED; +		grp->active_cnt--;  		if (m->advertised < ADV_IDLE) {  			pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");  			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);  		} -		grp->active_cnt--; -		list_del_init(&m->list); +  		if (list_empty(&grp->pending))  			return; @@ -619,7 +654,6 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  		tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);  		break;  	case MBR_RECLAIMING: -	case MBR_DISCOVERED:  	case MBR_JOINING:  	case MBR_LEAVING:  	default: @@ -627,6 +661,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,  	}  } +static void tipc_group_create_event(struct tipc_group *grp, +				    struct tipc_member *m, +				    u32 event, u16 seqno, +				    struct sk_buff_head *inputq) +{	u32 dnode = tipc_own_addr(grp->net); +	struct tipc_event evt; +	struct sk_buff *skb; +	struct tipc_msg *hdr; + +	evt.event = event; +	evt.found_lower = m->instance; +	evt.found_upper = m->instance; +	evt.port.ref = m->port; +	evt.port.node = m->node; +	evt.s.seq.type = grp->type; +	evt.s.seq.lower = m->instance; +	evt.s.seq.upper = m->instance; + +	skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, +			      GROUP_H_SIZE, sizeof(evt), dnode, m->node, +			      grp->portid, m->port, 0); +	if (!skb) +		return; + +	hdr = buf_msg(skb); +	msg_set_nametype(hdr, grp->type); +	msg_set_grp_evt(hdr, event); +	msg_set_dest_droppable(hdr, true); +	msg_set_grp_bc_seqno(hdr, seqno); +	memcpy(msg_data(hdr), &evt, sizeof(evt)); +	TIPC_SKB_CB(skb)->orig_member = m->instance; +	__skb_queue_tail(inputq, skb); +} +  static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,  				  int mtyp, struct sk_buff_head *xmitq)  { @@ -672,83 +740,73 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,  	u32 node = msg_orignode(hdr);  	u32 port = msg_origport(hdr);  	struct tipc_member *m, *pm; -	struct tipc_msg *ehdr;  	u16 remitted, in_flight;  	if (!grp)  		return; +	if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) +		return; +  	m = tipc_group_find_member(grp, node, port);  	switch (msg_type(hdr)) {  	case GRP_JOIN_MSG:  		if (!m)  			m = tipc_group_create_member(grp, node, port, -						     MBR_QUARANTINED); +						     0, MBR_JOINING);  		if (!m)  			return;  		m->bc_syncpt = msg_grp_bc_syncpt(hdr);  		m->bc_rcv_nxt = m->bc_syncpt;  		m->window += msg_adv_win(hdr); -		/* Wait until PUBLISH event is received */ -		if (m->state == MBR_DISCOVERED) { -			m->state = MBR_JOINING; -		} else if (m->state == MBR_PUBLISHED) { -			m->state = MBR_JOINED; -			*usr_wakeup = true; -			m->usr_pending = false; -			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); -			ehdr = buf_msg(m->event_msg); -			msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); -			__skb_queue_tail(inputq, m->event_msg); -		} -		list_del_init(&m->congested); +		/* Wait until PUBLISH event is received if necessary */ +		if (m->state != MBR_PUBLISHED) +			return; + +		/* Member can be taken into service */ +		m->state = MBR_JOINED; +		tipc_group_open(m, usr_wakeup);  		tipc_group_update_member(m, 0); +		tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); +		tipc_group_create_event(grp, m, TIPC_PUBLISHED, +					m->bc_syncpt, inputq);  		return;  	case GRP_LEAVE_MSG:  		if (!m)  			return;  		m->bc_syncpt = msg_grp_bc_syncpt(hdr);  		list_del_init(&m->list); -		list_del_init(&m->congested); -		*usr_wakeup = true; - -		/* Wait until WITHDRAW event is received */ -		if (m->state != MBR_LEAVING) { -			tipc_group_decr_active(grp, m); -			m->state = MBR_LEAVING; -			return; -		} -		/* Otherwise deliver already received WITHDRAW event */ -		ehdr = buf_msg(m->event_msg); -		msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); -		__skb_queue_tail(inputq, m->event_msg); +		tipc_group_open(m, usr_wakeup); +		tipc_group_decr_active(grp, m); +		m->state = MBR_LEAVING; +		tipc_group_create_event(grp, m, TIPC_WITHDRAWN, +					m->bc_syncpt, inputq);  		return;  	case GRP_ADV_MSG:  		if (!m)  			return;  		m->window += msg_adv_win(hdr); -		*usr_wakeup = m->usr_pending; -		m->usr_pending = false; -		list_del_init(&m->congested); +		tipc_group_open(m, usr_wakeup);  		return;  	case GRP_ACK_MSG:  		if (!m)  			return;  		m->bc_acked = msg_grp_bc_acked(hdr);  		if (--grp->bc_ackers) -			break; +			return; +		list_del_init(&m->small_win); +		*m->group->open = true;  		*usr_wakeup = true; -		m->usr_pending = false; +		tipc_group_update_member(m, 0);  		return;  	case GRP_RECLAIM_MSG:  		if (!m)  			return; -		*usr_wakeup = m->usr_pending; -		m->usr_pending = false;  		tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);  		m->window = ADV_IDLE; +		tipc_group_open(m, usr_wakeup);  		return;  	case GRP_REMIT_MSG:  		if (!m || m->state != MBR_RECLAIMING) @@ -763,18 +821,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,  			m->advertised = ADV_IDLE + in_flight;  			return;  		} -		/* All messages preceding the REMIT have been read */ -		if (m->advertised <= remitted) { -			m->state = MBR_JOINED; -			in_flight = 0; -		} -		/* ..and the REMIT overtaken by more messages => re-advertise */ +		/* This should never happen */  		if (m->advertised < remitted) -			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); +			pr_warn_ratelimited("Unexpected REMIT msg\n"); -		m->advertised = ADV_IDLE + in_flight; +		/* All messages preceding the REMIT have been read */ +		m->state = MBR_JOINED;  		grp->active_cnt--; -		list_del_init(&m->list); +		m->advertised = ADV_IDLE;  		/* Set oldest pending member to active and advertise */  		if (list_empty(&grp->pending)) @@ -796,11 +850,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,  void tipc_group_member_evt(struct tipc_group *grp,  			   bool *usr_wakeup,  			   int *sk_rcvbuf, -			   struct sk_buff *skb, +			   struct tipc_msg *hdr,  			   struct sk_buff_head *inputq,  			   struct sk_buff_head *xmitq)  { -	struct tipc_msg *hdr = buf_msg(skb);  	struct tipc_event *evt = (void *)msg_data(hdr);  	u32 instance = evt->found_lower;  	u32 node = evt->port.node; @@ -808,89 +861,59 @@ void tipc_group_member_evt(struct tipc_group *grp,  	int event = evt->event;  	struct tipc_member *m;  	struct net *net; -	bool node_up;  	u32 self;  	if (!grp) -		goto drop; +		return;  	net = grp->net;  	self = tipc_own_addr(net);  	if (!grp->loopback && node == self && port == grp->portid) -		goto drop; - -	/* Convert message before delivery to user */ -	msg_set_hdr_sz(hdr, GROUP_H_SIZE); -	msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); -	msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); -	msg_set_origport(hdr, port); -	msg_set_orignode(hdr, node); -	msg_set_nametype(hdr, grp->type); -	msg_set_grp_evt(hdr, event); +		return;  	m = tipc_group_find_member(grp, node, port); -	if (event == TIPC_PUBLISHED) { -		if (!m) -			m = tipc_group_create_member(grp, node, port, -						     MBR_DISCOVERED); -		if (!m) -			goto drop; - -		/* Hold back event if JOIN message not yet received */ -		if (m->state == MBR_DISCOVERED) { -			m->event_msg = skb; -			m->state = MBR_PUBLISHED; -		} else { -			msg_set_grp_bc_seqno(hdr, m->bc_syncpt); -			__skb_queue_tail(inputq, skb); -			m->state = MBR_JOINED; -			*usr_wakeup = true; -			m->usr_pending = false; +	switch (event) { +	case TIPC_PUBLISHED: +		/* Send and wait for arrival of JOIN message if necessary */ +		if (!m) { +			m = tipc_group_create_member(grp, node, port, instance, +						     MBR_PUBLISHED); +			if (!m) +				break; +			tipc_group_update_member(m, 0); +			tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); +			break;  		} + +		if (m->state != MBR_JOINING) +			break; + +		/* Member can be taken into service */  		m->instance = instance; -		TIPC_SKB_CB(skb)->orig_member = m->instance; +		m->state = MBR_JOINED; +		tipc_group_open(m, usr_wakeup); +		tipc_group_update_member(m, 0);  		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); -		if (m->window < ADV_IDLE) -			tipc_group_update_member(m, 0); -		else -			list_del_init(&m->congested); -	} else if (event == TIPC_WITHDRAWN) { +		tipc_group_create_event(grp, m, TIPC_PUBLISHED, +					m->bc_syncpt, inputq); +		break; +	case TIPC_WITHDRAWN:  		if (!m) -			goto drop; - -		TIPC_SKB_CB(skb)->orig_member = m->instance; +			break; -		*usr_wakeup = true; -		m->usr_pending = false; -		node_up = tipc_node_is_up(net, node); -		m->event_msg = NULL; - -		if (node_up) { -			/* Hold back event if a LEAVE msg should be expected */ -			if (m->state != MBR_LEAVING) { -				m->event_msg = skb; -				tipc_group_decr_active(grp, m); -				m->state = MBR_LEAVING; -			} else { -				msg_set_grp_bc_seqno(hdr, m->bc_syncpt); -				__skb_queue_tail(inputq, skb); -			} -		} else { -			if (m->state != MBR_LEAVING) { -				tipc_group_decr_active(grp, m); -				m->state = MBR_LEAVING; -				msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); -			} else { -				msg_set_grp_bc_seqno(hdr, m->bc_syncpt); -			} -			__skb_queue_tail(inputq, skb); -		} +		tipc_group_decr_active(grp, m); +		m->state = MBR_LEAVING;  		list_del_init(&m->list); -		list_del_init(&m->congested); +		tipc_group_open(m, usr_wakeup); + +		/* Only send event if no LEAVE message can be expected */ +		if (!tipc_node_is_up(net, node)) +			tipc_group_create_event(grp, m, TIPC_WITHDRAWN, +						m->bc_rcv_nxt, inputq); +		break; +	default: +		break;  	}  	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp); -	return; -drop: -	kfree_skb(skb);  }  |