Commit 8348500f authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: add option to suppress PUBLISH events for pre-existing publications

Currently, when a user is subscribing for binding table publications,
he will receive a PUBLISH event for all already existing matching items
in the binding table.

However, a group socket making a subscriptions doesn't need this initial
status update from the binding table, because it has already scanned it
during the join operation. Worse, the multiplicatory effect of issuing
mutual events for dozens or hundreds group members within a short time
frame put a heavy load on the topology server, with the end result that
scale out operations on a big group tend to take much longer than needed.

We now add a new filter option, TIPC_SUB_NO_STATUS, for topology server
subscriptions, so that this initial avalanche of events is suppressed.
This change, along with the previous commit, significantly improves the
range and speed of group scale out operations.

We keep the new option internal for the tipc driver, at least for now.
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent d12d2e12
...@@ -177,7 +177,9 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, ...@@ -177,7 +177,9 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
grp->scope = mreq->scope; grp->scope = mreq->scope;
grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) if (tipc_topsrv_kern_subscr(net, portid, type,
TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS,
0, ~0, &grp->subid))
return grp; return grp;
kfree(grp); kfree(grp);
return NULL; return NULL;
......
...@@ -405,12 +405,13 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net, ...@@ -405,12 +405,13 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net,
} }
/** /**
* tipc_nameseq_subscribe - attach a subscription, and issue * tipc_nameseq_subscribe - attach a subscription, and optionally
* the prescribed number of events if there is any sub- * issue the prescribed number of events if there is any sub-
* sequence overlapping with the requested sequence * sequence overlapping with the requested sequence
*/ */
static void tipc_nameseq_subscribe(struct name_seq *nseq, static void tipc_nameseq_subscribe(struct name_seq *nseq,
struct tipc_subscription *s) struct tipc_subscription *s,
bool status)
{ {
struct sub_seq *sseq = nseq->sseqs; struct sub_seq *sseq = nseq->sseqs;
struct tipc_name_seq ns; struct tipc_name_seq ns;
...@@ -420,7 +421,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq, ...@@ -420,7 +421,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
tipc_subscrp_get(s); tipc_subscrp_get(s);
list_add(&s->nameseq_list, &nseq->subscriptions); list_add(&s->nameseq_list, &nseq->subscriptions);
if (!sseq) if (!status || !sseq)
return; return;
while (sseq != &nseq->sseqs[nseq->first_free]) { while (sseq != &nseq->sseqs[nseq->first_free]) {
...@@ -811,7 +812,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref, ...@@ -811,7 +812,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
/** /**
* tipc_nametbl_subscribe - add a subscription object to the name table * tipc_nametbl_subscribe - add a subscription object to the name table
*/ */
void tipc_nametbl_subscribe(struct tipc_subscription *s) void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
{ {
struct tipc_net *tn = net_generic(s->net, tipc_net_id); struct tipc_net *tn = net_generic(s->net, tipc_net_id);
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
...@@ -825,7 +826,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s) ...@@ -825,7 +826,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
if (seq) { if (seq) {
spin_lock_bh(&seq->lock); spin_lock_bh(&seq->lock);
tipc_nameseq_subscribe(seq, s); tipc_nameseq_subscribe(seq, s, status);
spin_unlock_bh(&seq->lock); spin_unlock_bh(&seq->lock);
} else { } else {
tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns);
......
...@@ -121,7 +121,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type, ...@@ -121,7 +121,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
u32 lower, u32 node, u32 ref, u32 lower, u32 node, u32 ref,
u32 key); u32 key);
void tipc_nametbl_subscribe(struct tipc_subscription *s); void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status);
void tipc_nametbl_unsubscribe(struct tipc_subscription *s); void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
int tipc_nametbl_init(struct net *net); int tipc_nametbl_init(struct net *net);
void tipc_nametbl_stop(struct net *net); void tipc_nametbl_stop(struct net *net);
......
...@@ -490,7 +490,7 @@ void tipc_conn_terminate(struct tipc_server *s, int conid) ...@@ -490,7 +490,7 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
} }
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
u32 lower, u32 upper, int *conid) u32 filter, u32 lower, u32 upper, int *conid)
{ {
struct tipc_subscriber *scbr; struct tipc_subscriber *scbr;
struct tipc_subscr sub; struct tipc_subscr sub;
...@@ -501,7 +501,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, ...@@ -501,7 +501,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
sub.seq.lower = lower; sub.seq.lower = lower;
sub.seq.upper = upper; sub.seq.upper = upper;
sub.timeout = TIPC_WAIT_FOREVER; sub.timeout = TIPC_WAIT_FOREVER;
sub.filter = TIPC_SUB_PORTS; sub.filter = filter;
*(u32 *)&sub.usr_handle = port; *(u32 *)&sub.usr_handle = port;
con = tipc_alloc_conn(tipc_topsrv(net)); con = tipc_alloc_conn(tipc_topsrv(net));
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include <net/net_namespace.h> #include <net/net_namespace.h>
#define TIPC_SERVER_NAME_LEN 32 #define TIPC_SERVER_NAME_LEN 32
#define TIPC_SUB_NO_STATUS 0x80
/** /**
* struct tipc_server - TIPC server structure * struct tipc_server - TIPC server structure
...@@ -84,7 +85,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, ...@@ -84,7 +85,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
struct sockaddr_tipc *addr, void *data, size_t len); struct sockaddr_tipc *addr, void *data, size_t len);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
u32 lower, u32 upper, int *conid); u32 filter, u32 lower, u32 upper, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid); void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
/** /**
......
...@@ -286,7 +286,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, ...@@ -286,7 +286,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
} }
static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
struct tipc_subscriber *subscriber, int swap) struct tipc_subscriber *subscriber, int swap,
bool status)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_subscription *sub = NULL; struct tipc_subscription *sub = NULL;
...@@ -299,7 +300,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, ...@@ -299,7 +300,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
spin_lock_bh(&subscriber->lock); spin_lock_bh(&subscriber->lock);
list_add(&sub->subscrp_list, &subscriber->subscrp_list); list_add(&sub->subscrp_list, &subscriber->subscrp_list);
sub->subscriber = subscriber; sub->subscriber = subscriber;
tipc_nametbl_subscribe(sub); tipc_nametbl_subscribe(sub, status);
tipc_subscrb_get(subscriber); tipc_subscrb_get(subscriber);
spin_unlock_bh(&subscriber->lock); spin_unlock_bh(&subscriber->lock);
...@@ -323,6 +324,7 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid, ...@@ -323,6 +324,7 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,
{ {
struct tipc_subscriber *subscriber = usr_data; struct tipc_subscriber *subscriber = usr_data;
struct tipc_subscr *s = (struct tipc_subscr *)buf; struct tipc_subscr *s = (struct tipc_subscr *)buf;
bool status;
int swap; int swap;
/* Determine subscriber's endianness */ /* Determine subscriber's endianness */
...@@ -334,8 +336,8 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid, ...@@ -334,8 +336,8 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,
s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
return tipc_subscrp_cancel(s, subscriber); return tipc_subscrp_cancel(s, subscriber);
} }
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
tipc_subscrp_subscribe(net, s, subscriber, swap); tipc_subscrp_subscribe(net, s, subscriber, swap, status);
} }
/* Handle one request to establish a new subscriber */ /* Handle one request to establish a new subscriber */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment