aboutsummaryrefslogtreecommitdiff
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c272
-rw-r--r--net/tipc/bcast.h12
-rw-r--r--net/tipc/core.c2
-rw-r--r--net/tipc/core.h3
-rw-r--r--net/tipc/link.c8
-rw-r--r--net/tipc/msg.h10
-rw-r--r--net/tipc/netlink.c26
-rw-r--r--net/tipc/node.c18
-rw-r--r--net/tipc/node.h6
-rw-r--r--net/tipc/socket.c6
10 files changed, 334 insertions, 29 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index d8026543bf4c..76e14dc08bb9 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -54,7 +54,9 @@ const char tipc_bclink_name[] = "broadcast-link";
* @dests: array keeping number of reachable destinations per bearer
* @primary_bearer: a bearer having links to all broadcast destinations, if any
* @bcast_support: indicates if primary bearer, if any, supports broadcast
+ * @force_bcast: forces broadcast for multicast traffic
* @rcast_support: indicates if all peer nodes support replicast
+ * @force_rcast: forces replicast for multicast traffic
* @rc_ratio: dest count as percentage of cluster size where send method changes
* @bc_threshold: calculated from rc_ratio; if dests > threshold use broadcast
*/
@@ -64,7 +66,9 @@ struct tipc_bc_base {
int dests[MAX_BEARERS];
int primary_bearer;
bool bcast_support;
+ bool force_bcast;
bool rcast_support;
+ bool force_rcast;
int rc_ratio;
int bc_threshold;
};
@@ -216,9 +220,24 @@ static void tipc_bcast_select_xmit_method(struct net *net, int dests,
}
/* Can current method be changed ? */
method->expires = jiffies + TIPC_METHOD_EXPIRE;
- if (method->mandatory || time_before(jiffies, exp))
+ if (method->mandatory)
return;
+ if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
+ time_before(jiffies, exp))
+ return;
+
+ /* Configuration as force 'broadcast' method */
+ if (bb->force_bcast) {
+ method->rcast = false;
+ return;
+ }
+ /* Configuration as force 'replicast' method */
+ if (bb->force_rcast) {
+ method->rcast = true;
+ return;
+ }
+ /* Configuration as 'autoselect' or default method */
/* Determine method to use now */
method->rcast = dests <= bb->bc_threshold;
}
@@ -281,6 +300,63 @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
return 0;
}
+/* tipc_mcast_send_sync - deliver a dummy message with SYN bit
+ * @net: the applicable net namespace
+ * @skb: socket buffer to copy
+ * @method: send method to be used
+ * @dests: destination nodes for message.
+ * @cong_link_cnt: returns number of encountered congested destination links
+ * Returns 0 if success, otherwise errno
+ */
+static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
+ struct tipc_mc_method *method,
+ struct tipc_nlist *dests,
+ u16 *cong_link_cnt)
+{
+ struct tipc_msg *hdr, *_hdr;
+ struct sk_buff_head tmpq;
+ struct sk_buff *_skb;
+
+ /* Is a cluster supporting with new capabilities ? */
+ if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
+ return 0;
+
+ hdr = buf_msg(skb);
+ if (msg_user(hdr) == MSG_FRAGMENTER)
+ hdr = msg_get_wrapped(hdr);
+ if (msg_type(hdr) != TIPC_MCAST_MSG)
+ return 0;
+
+ /* Allocate dummy message */
+ _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
+ if (!_skb)
+ return -ENOMEM;
+
+ /* Preparing for 'synching' header */
+ msg_set_syn(hdr, 1);
+
+ /* Copy skb's header into a dummy header */
+ skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
+ skb_orphan(_skb);
+
+ /* Reverse method for dummy message */
+ _hdr = buf_msg(_skb);
+ msg_set_size(_hdr, MCAST_H_SIZE);
+ msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
+
+ skb_queue_head_init(&tmpq);
+ __skb_queue_tail(&tmpq, _skb);
+ if (method->rcast)
+ tipc_bcast_xmit(net, &tmpq, cong_link_cnt);
+ else
+ tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt);
+
+ /* This queue should normally be empty by now */
+ __skb_queue_purge(&tmpq);
+
+ return 0;
+}
+
/* tipc_mcast_xmit - deliver message to indicated destination nodes
* and to identified node local sockets
* @net: the applicable net namespace
@@ -296,6 +372,9 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
u16 *cong_link_cnt)
{
struct sk_buff_head inputq, localq;
+ bool rcast = method->rcast;
+ struct tipc_msg *hdr;
+ struct sk_buff *skb;
int rc = 0;
skb_queue_head_init(&inputq);
@@ -309,6 +388,18 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
/* Send according to determined transmit method */
if (dests->remote) {
tipc_bcast_select_xmit_method(net, dests->remote, method);
+
+ skb = skb_peek(pkts);
+ hdr = buf_msg(skb);
+ if (msg_user(hdr) == MSG_FRAGMENTER)
+ hdr = msg_get_wrapped(hdr);
+ msg_set_is_rcast(hdr, method->rcast);
+
+ /* Switch method ? */
+ if (rcast != method->rcast)
+ tipc_mcast_send_sync(net, skb, method,
+ dests, cong_link_cnt);
+
if (method->rcast)
rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
else
@@ -485,10 +576,63 @@ static int tipc_bc_link_set_queue_limits(struct net *net, u32 limit)
return 0;
}
+static int tipc_bc_link_set_broadcast_mode(struct net *net, u32 bc_mode)
+{
+ struct tipc_bc_base *bb = tipc_bc_base(net);
+
+ switch (bc_mode) {
+ case BCLINK_MODE_BCAST:
+ if (!bb->bcast_support)
+ return -ENOPROTOOPT;
+
+ bb->force_bcast = true;
+ bb->force_rcast = false;
+ break;
+ case BCLINK_MODE_RCAST:
+ if (!bb->rcast_support)
+ return -ENOPROTOOPT;
+
+ bb->force_bcast = false;
+ bb->force_rcast = true;
+ break;
+ case BCLINK_MODE_SEL:
+ if (!bb->bcast_support || !bb->rcast_support)
+ return -ENOPROTOOPT;
+
+ bb->force_bcast = false;
+ bb->force_rcast = false;
+ break;
+ default:
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int tipc_bc_link_set_broadcast_ratio(struct net *net, u32 bc_ratio)
+{
+ struct tipc_bc_base *bb = tipc_bc_base(net);
+
+ if (!bb->bcast_support || !bb->rcast_support)
+ return -ENOPROTOOPT;
+
+ if (bc_ratio > 100 || bc_ratio <= 0)
+ return -EINVAL;
+
+ bb->rc_ratio = bc_ratio;
+ tipc_bcast_lock(net);
+ tipc_bcbase_calc_bc_threshold(net);
+ tipc_bcast_unlock(net);
+
+ return 0;
+}
+
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
{
int err;
u32 win;
+ u32 bc_mode;
+ u32 bc_ratio;
struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
if (!attrs[TIPC_NLA_LINK_PROP])
@@ -498,12 +642,28 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
if (err)
return err;
- if (!props[TIPC_NLA_PROP_WIN])
+ if (!props[TIPC_NLA_PROP_WIN] &&
+ !props[TIPC_NLA_PROP_BROADCAST] &&
+ !props[TIPC_NLA_PROP_BROADCAST_RATIO]) {
return -EOPNOTSUPP;
+ }
+
+ if (props[TIPC_NLA_PROP_BROADCAST]) {
+ bc_mode = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST]);
+ err = tipc_bc_link_set_broadcast_mode(net, bc_mode);
+ }
+
+ if (!err && props[TIPC_NLA_PROP_BROADCAST_RATIO]) {
+ bc_ratio = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST_RATIO]);
+ err = tipc_bc_link_set_broadcast_ratio(net, bc_ratio);
+ }
- win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
+ if (!err && props[TIPC_NLA_PROP_WIN]) {
+ win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
+ err = tipc_bc_link_set_queue_limits(net, win);
+ }
- return tipc_bc_link_set_queue_limits(net, win);
+ return err;
}
int tipc_bcast_init(struct net *net)
@@ -529,7 +689,7 @@ int tipc_bcast_init(struct net *net)
goto enomem;
bb->link = l;
tn->bcl = l;
- bb->rc_ratio = 25;
+ bb->rc_ratio = 10;
bb->rcast_support = true;
return 0;
enomem:
@@ -576,3 +736,105 @@ void tipc_nlist_purge(struct tipc_nlist *nl)
nl->remote = 0;
nl->local = false;
}
+
+u32 tipc_bcast_get_broadcast_mode(struct net *net)
+{
+ struct tipc_bc_base *bb = tipc_bc_base(net);
+
+ if (bb->force_bcast)
+ return BCLINK_MODE_BCAST;
+
+ if (bb->force_rcast)
+ return BCLINK_MODE_RCAST;
+
+ if (bb->bcast_support && bb->rcast_support)
+ return BCLINK_MODE_SEL;
+
+ return 0;
+}
+
+u32 tipc_bcast_get_broadcast_ratio(struct net *net)
+{
+ struct tipc_bc_base *bb = tipc_bc_base(net);
+
+ return bb->rc_ratio;
+}
+
+void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
+ struct sk_buff_head *inputq)
+{
+ struct sk_buff *skb, *_skb, *tmp;
+ struct tipc_msg *hdr, *_hdr;
+ bool match = false;
+ u32 node, port;
+
+ skb = skb_peek(inputq);
+ hdr = buf_msg(skb);
+
+ if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
+ return;
+
+ node = msg_orignode(hdr);
+ if (node == tipc_own_addr(net))
+ return;
+
+ port = msg_origport(hdr);
+
+ /* Has the twin SYN message already arrived ? */
+ skb_queue_walk(defq, _skb) {
+ _hdr = buf_msg(_skb);
+ if (msg_orignode(_hdr) != node)
+ continue;
+ if (msg_origport(_hdr) != port)
+ continue;
+ match = true;
+ break;
+ }
+
+ if (!match) {
+ if (!msg_is_syn(hdr))
+ return;
+ __skb_dequeue(inputq);
+ __skb_queue_tail(defq, skb);
+ return;
+ }
+
+ /* Deliver non-SYN message from other link, otherwise queue it */
+ if (!msg_is_syn(hdr)) {
+ if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
+ return;
+ __skb_dequeue(inputq);
+ __skb_queue_tail(defq, skb);
+ return;
+ }
+
+ /* Queue non-SYN/SYN message from same link */
+ if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
+ __skb_dequeue(inputq);
+ __skb_queue_tail(defq, skb);
+ return;
+ }
+
+ /* Matching SYN messages => return the one with data, if any */
+ __skb_unlink(_skb, defq);
+ if (msg_data_sz(hdr)) {
+ kfree_skb(_skb);
+ } else {
+ __skb_dequeue(inputq);
+ kfree_skb(skb);
+ __skb_queue_tail(inputq, _skb);
+ }
+
+ /* Deliver subsequent non-SYN messages from same peer */
+ skb_queue_walk_safe(defq, _skb, tmp) {
+ _hdr = buf_msg(_skb);
+ if (msg_orignode(_hdr) != node)
+ continue;
+ if (msg_origport(_hdr) != port)
+ continue;
+ if (msg_is_syn(_hdr))
+ break;
+ __skb_unlink(_skb, defq);
+ __skb_queue_tail(inputq, _skb);
+ }
+}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 751530ab0c49..dadad953e2be 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -48,6 +48,10 @@ extern const char tipc_bclink_name[];
#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
+#define BCLINK_MODE_BCAST 0x1
+#define BCLINK_MODE_RCAST 0x2
+#define BCLINK_MODE_SEL 0x4
+
struct tipc_nlist {
struct list_head list;
u32 self;
@@ -63,11 +67,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node);
/* Cookie to be used between socket and broadcast layer
* @rcast: replicast (instead of broadcast) was used at previous xmit
* @mandatory: broadcast/replicast indication was set by user
+ * @deferredq: defer queue to make message in order
* @expires: re-evaluate non-mandatory transmit method if we are past this
*/
struct tipc_mc_method {
bool rcast;
bool mandatory;
+ struct sk_buff_head deferredq;
unsigned long expires;
};
@@ -92,6 +98,12 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
int tipc_bclink_reset_stats(struct net *net);
+u32 tipc_bcast_get_broadcast_mode(struct net *net);
+u32 tipc_bcast_get_broadcast_ratio(struct net *net);
+
+void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
+ struct sk_buff_head *inputq);
+
static inline void tipc_bcast_lock(struct net *net)
{
spin_lock_bh(&tipc_net(net)->bclock);
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 5b38f5164281..27cccd101ef6 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -43,6 +43,7 @@
#include "net.h"
#include "socket.h"
#include "bcast.h"
+#include "node.h"
#include <linux/module.h>
@@ -59,6 +60,7 @@ static int __net_init tipc_init_net(struct net *net)
tn->node_addr = 0;
tn->trial_addr = 0;
tn->addr_trial_end = 0;
+ tn->capabilities = TIPC_NODE_CAPABILITIES;
memset(tn->node_id, 0, sizeof(tn->node_id));
memset(tn->node_id_string, 0, sizeof(tn->node_id_string));
tn->mon_threshold = TIPC_DEF_MON_THRESHOLD;
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 8020a6c360ff..7a68e1b6a066 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -122,6 +122,9 @@ struct tipc_net {
/* Topology subscription server */
struct tipc_topsrv *topsrv;
atomic_t subscription_count;
+
+ /* Cluster capabilities */
+ u16 capabilities;
};
static inline struct tipc_net *tipc_net(struct net *net)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 341ecd796aa4..52d23b3ffaf5 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -2197,6 +2197,8 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
struct nlattr *attrs;
struct nlattr *prop;
struct tipc_net *tn = net_generic(net, tipc_net_id);
+ u32 bc_mode = tipc_bcast_get_broadcast_mode(net);
+ u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net);
struct tipc_link *bcl = tn->bcl;
if (!bcl)
@@ -2233,6 +2235,12 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
goto attr_msg_full;
if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
goto prop_msg_full;
+ if (nla_put_u32(msg->skb, TIPC_NLA_PROP_BROADCAST, bc_mode))
+ goto prop_msg_full;
+ if (bc_mode & BCLINK_MODE_SEL)
+ if (nla_put_u32(msg->skb, TIPC_NLA_PROP_BROADCAST_RATIO,
+ bc_ratio))
+ goto prop_msg_full;
nla_nest_end(msg->skb, prop);
err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d7e4b8b93f9d..528ba9241acc 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d)
msg_set_bits(m, 0, 18, 1, d);
}
+static inline bool msg_is_rcast(struct tipc_msg *m)
+{
+ return msg_bits(m, 0, 18, 0x1);
+}
+
+static inline void msg_set_is_rcast(struct tipc_msg *m, bool d)
+{
+ msg_set_bits(m, 0, 18, 0x1, d);
+}
+
static inline void msg_set_size(struct tipc_msg *m, u32 sz)
{
m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c
index 99ee419210ba..2d178df0a89f 100644
--- a/net/tipc/netlink.c
+++ b/net/tipc/netlink.c
@@ -110,7 +110,9 @@ const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
[TIPC_NLA_PROP_UNSPEC] = { .type = NLA_UNSPEC },
[TIPC_NLA_PROP_PRIO] = { .type = NLA_U32 },
[TIPC_NLA_PROP_TOL] = { .type = NLA_U32 },
- [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
+ [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 },
+ [TIPC_NLA_PROP_BROADCAST] = { .type = NLA_U32 },
+ [TIPC_NLA_PROP_BROADCAST_RATIO] = { .type = NLA_U32 }
};
const struct nla_policy tipc_nl_bearer_policy[TIPC_NLA_BEARER_MAX + 1] = {
@@ -142,114 +144,93 @@ static const struct genl_ops tipc_genl_v2_ops[] = {
{
.cmd = TIPC_NL_BEARER_DISABLE,
.doit = tipc_nl_bearer_disable,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_BEARER_ENABLE,
.doit = tipc_nl_bearer_enable,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_BEARER_GET,
.doit = tipc_nl_bearer_get,
.dumpit = tipc_nl_bearer_dump,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_BEARER_ADD,
.doit = tipc_nl_bearer_add,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_BEARER_SET,
.doit = tipc_nl_bearer_set,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_SOCK_GET,
.start = tipc_dump_start,
.dumpit = tipc_nl_sk_dump,
.done = tipc_dump_done,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_PUBL_GET,
.dumpit = tipc_nl_publ_dump,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_LINK_GET,
.doit = tipc_nl_node_get_link,
.dumpit = tipc_nl_node_dump_link,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_LINK_SET,
.doit = tipc_nl_node_set_link,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_LINK_RESET_STATS,
.doit = tipc_nl_node_reset_link_stats,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_MEDIA_GET,
.doit = tipc_nl_media_get,
.dumpit = tipc_nl_media_dump,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_MEDIA_SET,
.doit = tipc_nl_media_set,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_NODE_GET,
.dumpit = tipc_nl_node_dump,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_NET_GET,
.dumpit = tipc_nl_net_dump,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_NET_SET,
.doit = tipc_nl_net_set,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_NAME_TABLE_GET,
.dumpit = tipc_nl_name_table_dump,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_MON_SET,
.doit = tipc_nl_node_set_monitor,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_MON_GET,
.doit = tipc_nl_node_get_monitor,
.dumpit = tipc_nl_node_dump_monitor,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_MON_PEER_GET,
.dumpit = tipc_nl_node_dump_monitor_peer,
- .policy = tipc_nl_policy,
},
{
.cmd = TIPC_NL_PEER_REMOVE,
.doit = tipc_nl_peer_rm,
- .policy = tipc_nl_policy,
},
#ifdef CONFIG_TIPC_MEDIA_UDP
{
.cmd = TIPC_NL_UDP_GET_REMOTEIP,
.dumpit = tipc_udp_nl_dump_remoteip,
- .policy = tipc_nl_policy,
},
#endif
};
@@ -259,6 +240,7 @@ struct genl_family tipc_genl_family __ro_after_init = {
.version = TIPC_GENL_V2_VERSION,
.hdrsize = 0,
.maxattr = TIPC_NLA_MAX,
+ .policy = tipc_nl_policy,
.netnsok = true,
.module = THIS_MODULE,
.ops = tipc_genl_v2_ops,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index dd3b6dc17662..3469b5d4ed32 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct net *net, u32 addr,
tipc_link_update_caps(l, capabilities);
}
write_unlock_bh(&n->lock);
+ /* Calculate cluster capabilities */
+ tn->capabilities = TIPC_NODE_CAPABILITIES;
+ list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
+ tn->capabilities &= temp_node->capabilities;
+ }
goto exit;
}
n = kzalloc(sizeof(*n), GFP_ATOMIC);
@@ -433,6 +438,11 @@ static struct tipc_node *tipc_node_create(struct net *net, u32 addr,
break;
}
list_add_tail_rcu(&n->list, &temp_node->list);
+ /* Calculate cluster capabilities */
+ tn->capabilities = TIPC_NODE_CAPABILITIES;
+ list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
+ tn->capabilities &= temp_node->capabilities;
+ }
trace_tipc_node_create(n, true, " ");
exit:
spin_unlock_bh(&tn->node_list_lock);
@@ -589,6 +599,7 @@ static void tipc_node_clear_links(struct tipc_node *node)
*/
static bool tipc_node_cleanup(struct tipc_node *peer)
{
+ struct tipc_node *temp_node;
struct tipc_net *tn = tipc_net(peer->net);
bool deleted = false;
@@ -604,6 +615,13 @@ static bool tipc_node_cleanup(struct tipc_node *peer)
deleted = true;
}
tipc_node_write_unlock(peer);
+
+ /* Calculate cluster capabilities */
+ tn->capabilities = TIPC_NODE_CAPABILITIES;
+ list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
+ tn->capabilities &= temp_node->capabilities;
+ }
+
spin_unlock_bh(&tn->node_list_lock);
return deleted;
}
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 4f59a30e989a..2404225c5d58 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -51,7 +51,8 @@ enum {
TIPC_BLOCK_FLOWCTL = (1 << 3),
TIPC_BCAST_RCAST = (1 << 4),
TIPC_NODE_ID128 = (1 << 5),
- TIPC_LINK_PROTO_SEQNO = (1 << 6)
+ TIPC_LINK_PROTO_SEQNO = (1 << 6),
+ TIPC_MCAST_RBCTL = (1 << 7)
};
#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
@@ -60,7 +61,8 @@ enum {
TIPC_BCAST_RCAST | \
TIPC_BLOCK_FLOWCTL | \
TIPC_NODE_ID128 | \
- TIPC_LINK_PROTO_SEQNO)
+ TIPC_LINK_PROTO_SEQNO | \
+ TIPC_MCAST_RBCTL)
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index b542f14ed444..8ac8ddf1e324 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -485,6 +485,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk_set_unreturnable(tsk, true);
if (sock->type == SOCK_DGRAM)
tsk_set_unreliable(tsk, true);
+ __skb_queue_head_init(&tsk->mc_method.deferredq);
}
trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
@@ -582,6 +583,7 @@ static int tipc_release(struct socket *sock)
sk->sk_shutdown = SHUTDOWN_MASK;
tipc_sk_leave(tsk);
tipc_sk_withdraw(tsk, 0, NULL);
+ __skb_queue_purge(&tsk->mc_method.deferredq);
sk_stop_timer(sk, &sk->sk_timer);
tipc_sk_remove(tsk);
@@ -2149,6 +2151,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
struct tipc_msg *hdr = buf_msg(skb);
struct net *net = sock_net(sk);
struct sk_buff_head inputq;
+ int mtyp = msg_type(hdr);
int limit, err = TIPC_OK;
trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
@@ -2162,6 +2165,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
if (unlikely(grp))
tipc_group_filter_msg(grp, &inputq, xmitq);
+ if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG)
+ tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq);
+
/* Validate and add to receive buffer if there is space */
while ((skb = __skb_dequeue(&inputq))) {
hdr = buf_msg(skb);