Commit c373dac5 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-topology-cleanup'

Ying Xue says:

====================
tipc: cleanup topology server

Not only function names declared in subscr.c are very confused, but
also topology server's locking policy is not designed very well, for
instance, usually leading to panic in some special corner cases.

In this series, we attempt to eliminate the confusion of function names
and simplify topology server's locking policy to solve above mentioned
issues. More importantly, the change will make relevant code easily
understandable and maintainable.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 29a1ff65 90bdfcb7
......@@ -68,7 +68,7 @@ static int __net_init tipc_init_net(struct net *net)
if (err)
goto out_nametbl;
err = tipc_subscr_start(net);
err = tipc_topsrv_start(net);
if (err)
goto out_subscr;
return 0;
......@@ -83,7 +83,7 @@ static int __net_init tipc_init_net(struct net *net)
static void __net_exit tipc_exit_net(struct net *net)
{
tipc_subscr_stop(net);
tipc_topsrv_stop(net);
tipc_net_stop(net);
tipc_nametbl_stop(net);
tipc_sk_rht_destroy(net);
......
......@@ -330,13 +330,9 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
/* Any subscriptions waiting for notification? */
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_subscr_report_overlap(s,
publ->lower,
publ->upper,
TIPC_PUBLISHED,
publ->ref,
publ->node,
created_subseq);
tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
TIPC_PUBLISHED, publ->ref,
publ->node, created_subseq);
}
return publ;
}
......@@ -404,13 +400,9 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net,
/* Notify any waiting subscriptions */
list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
tipc_subscr_report_overlap(s,
publ->lower,
publ->upper,
TIPC_WITHDRAWN,
publ->ref,
publ->node,
removed_subseq);
tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
TIPC_WITHDRAWN, publ->ref,
publ->node, removed_subseq);
}
return publ;
......@@ -432,18 +424,16 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
return;
while (sseq != &nseq->sseqs[nseq->first_free]) {
if (tipc_subscr_overlap(s, sseq->lower, sseq->upper)) {
if (tipc_subscrp_check_overlap(s, sseq->lower, sseq->upper)) {
struct publication *crs;
struct name_info *info = sseq->info;
int must_report = 1;
list_for_each_entry(crs, &info->zone_list, zone_list) {
tipc_subscr_report_overlap(s,
sseq->lower,
tipc_subscrp_report_overlap(s, sseq->lower,
sseq->upper,
TIPC_PUBLISHED,
crs->ref,
crs->node,
crs->ref, crs->node,
must_report);
must_report = 0;
}
......
......@@ -309,6 +309,10 @@ static int tipc_accept_from_sock(struct tipc_conn *con)
/* Notify that new connection is incoming */
newcon->usr_data = s->tipc_conn_new(newcon->conid);
if (!newcon->usr_data) {
sock_release(newsock);
return -ENOMEM;
}
/* Wake up receive process in case of 'SYN+' message */
newsock->sk->sk_data_ready(newsock->sk);
......
......@@ -40,16 +40,21 @@
/**
* struct tipc_subscriber - TIPC network topology subscriber
* @kref: reference counter to tipc_subscription object
* @conid: connection identifier to server connecting to subscriber
* @lock: control access to subscriber
* @subscription_list: list of subscription objects for this subscriber
* @subscrp_list: list of subscription objects for this subscriber
*/
struct tipc_subscriber {
struct kref kref;
int conid;
spinlock_t lock;
struct list_head subscription_list;
struct list_head subscrp_list;
};
static void tipc_subscrp_delete(struct tipc_subscription *sub);
static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
/**
* htohl - convert value to endianness used by destination
* @in: value to convert
......@@ -62,9 +67,9 @@ static u32 htohl(u32 in, int swap)
return swap ? swab32(in) : in;
}
static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper, u32 event, u32 port_ref,
u32 node)
static void tipc_subscrp_send_event(struct tipc_subscription *sub,
u32 found_lower, u32 found_upper,
u32 event, u32 port_ref, u32 node)
{
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
struct tipc_subscriber *subscriber = sub->subscriber;
......@@ -82,11 +87,12 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
}
/**
* tipc_subscr_overlap - test for subscription overlap with the given values
* tipc_subscrp_check_overlap - test for subscription overlap with the
* given values
*
* Returns 1 if there is overlap, otherwise 0.
*/
int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
int tipc_subscrp_check_overlap(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper)
{
if (found_lower < sub->seq.lower)
......@@ -98,136 +104,119 @@ int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
return 1;
}
/**
* tipc_subscr_report_overlap - issue event if there is subscription overlap
*
* Protected by nameseq.lock in name_table.c
*/
void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper, u32 event, u32 port_ref,
u32 node, int must)
{
if (!tipc_subscr_overlap(sub, found_lower, found_upper))
if (!tipc_subscrp_check_overlap(sub, found_lower, found_upper))
return;
if (!must && !(sub->filter & TIPC_SUB_PORTS))
return;
subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
node);
}
static void subscr_timeout(unsigned long data)
static void tipc_subscrp_timeout(unsigned long data)
{
struct tipc_subscription *sub = (struct tipc_subscription *)data;
struct tipc_subscriber *subscriber = sub->subscriber;
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
/* The spin lock per subscriber is used to protect its members */
spin_lock_bh(&subscriber->lock);
/* Notify subscriber of timeout */
tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
TIPC_SUBSCR_TIMEOUT, 0, 0);
/* Validate timeout (in case subscription is being cancelled) */
if (sub->timeout == TIPC_WAIT_FOREVER) {
spin_lock_bh(&subscriber->lock);
tipc_subscrp_delete(sub);
spin_unlock_bh(&subscriber->lock);
return;
}
/* Unlink subscription from name table */
tipc_nametbl_unsubscribe(sub);
tipc_subscrb_put(subscriber);
}
/* Unlink subscription from subscriber */
list_del(&sub->subscription_list);
static void tipc_subscrb_kref_release(struct kref *kref)
{
struct tipc_subscriber *subcriber = container_of(kref,
struct tipc_subscriber, kref);
spin_unlock_bh(&subscriber->lock);
kfree(subcriber);
}
/* Notify subscriber of timeout */
subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
TIPC_SUBSCR_TIMEOUT, 0, 0);
static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
{
kref_put(&subscriber->kref, tipc_subscrb_kref_release);
}
/* Now destroy subscription */
kfree(sub);
atomic_dec(&tn->subscription_count);
static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
{
kref_get(&subscriber->kref);
}
/**
* subscr_del - delete a subscription within a subscription list
*
* Called with subscriber lock held.
*/
static void subscr_del(struct tipc_subscription *sub)
static struct tipc_subscriber *tipc_subscrb_create(int conid)
{
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
struct tipc_subscriber *subscriber;
tipc_nametbl_unsubscribe(sub);
list_del(&sub->subscription_list);
kfree(sub);
atomic_dec(&tn->subscription_count);
subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
if (!subscriber) {
pr_warn("Subscriber rejected, no memory\n");
return NULL;
}
kref_init(&subscriber->kref);
INIT_LIST_HEAD(&subscriber->subscrp_list);
subscriber->conid = conid;
spin_lock_init(&subscriber->lock);
return subscriber;
}
static void subscr_release(struct tipc_subscriber *subscriber)
static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
{
struct tipc_subscription *sub;
struct tipc_subscription *sub_temp;
struct tipc_subscription *sub, *temp;
spin_lock_bh(&subscriber->lock);
/* Destroy any existing subscriptions for subscriber */
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
subscription_list) {
if (sub->timeout != TIPC_WAIT_FOREVER) {
spin_unlock_bh(&subscriber->lock);
del_timer_sync(&sub->timer);
spin_lock_bh(&subscriber->lock);
list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
subscrp_list) {
if (del_timer(&sub->timer)) {
tipc_subscrp_delete(sub);
tipc_subscrb_put(subscriber);
}
subscr_del(sub);
}
spin_unlock_bh(&subscriber->lock);
/* Now destroy subscriber */
kfree(subscriber);
tipc_subscrb_put(subscriber);
}
/**
* subscr_cancel - handle subscription cancellation request
*
* Called with subscriber lock held. Routine must temporarily release lock
* to enable the subscription timeout routine to finish without deadlocking;
* the lock is then reclaimed to allow caller to release it upon return.
*
* Note that fields of 's' use subscriber's endianness!
*/
static void subscr_cancel(struct tipc_subscr *s,
static void tipc_subscrp_delete(struct tipc_subscription *sub)
{
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
tipc_nametbl_unsubscribe(sub);
list_del(&sub->subscrp_list);
kfree(sub);
atomic_dec(&tn->subscription_count);
}
static void tipc_subscrp_cancel(struct tipc_subscr *s,
struct tipc_subscriber *subscriber)
{
struct tipc_subscription *sub;
struct tipc_subscription *sub_temp;
int found = 0;
struct tipc_subscription *sub, *temp;
spin_lock_bh(&subscriber->lock);
/* Find first matching subscription, exit if not found */
list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
subscription_list) {
list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
subscrp_list) {
if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
found = 1;
if (del_timer(&sub->timer)) {
tipc_subscrp_delete(sub);
tipc_subscrb_put(subscriber);
}
break;
}
}
if (!found)
return;
/* Cancel subscription timer (if used), then delete subscription */
if (sub->timeout != TIPC_WAIT_FOREVER) {
sub->timeout = TIPC_WAIT_FOREVER;
spin_unlock_bh(&subscriber->lock);
del_timer_sync(&sub->timer);
spin_lock_bh(&subscriber->lock);
}
subscr_del(sub);
}
/**
* subscr_subscribe - create subscription for subscriber
*
* Called with subscriber lock held.
*/
static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
static int tipc_subscrp_create(struct net *net, struct tipc_subscr *s,
struct tipc_subscriber *subscriber,
struct tipc_subscription **sub_p)
{
......@@ -241,7 +230,7 @@ static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
/* Detect & process a subscription cancellation request */
if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
subscr_cancel(s, subscriber);
tipc_subscrp_cancel(s, subscriber);
return 0;
}
......@@ -273,27 +262,30 @@ static int subscr_subscribe(struct net *net, struct tipc_subscr *s,
kfree(sub);
return -EINVAL;
}
list_add(&sub->subscription_list, &subscriber->subscription_list);
spin_lock_bh(&subscriber->lock);
list_add(&sub->subscrp_list, &subscriber->subscrp_list);
spin_unlock_bh(&subscriber->lock);
sub->subscriber = subscriber;
sub->swap = swap;
memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
memcpy(&sub->evt.s, s, sizeof(*s));
atomic_inc(&tn->subscription_count);
if (sub->timeout != TIPC_WAIT_FOREVER) {
setup_timer(&sub->timer, subscr_timeout, (unsigned long)sub);
mod_timer(&sub->timer, jiffies + sub->timeout);
}
setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
if (sub->timeout != TIPC_WAIT_FOREVER)
sub->timeout += jiffies;
if (!mod_timer(&sub->timer, sub->timeout))
tipc_subscrb_get(subscriber);
*sub_p = sub;
return 0;
}
/* Handle one termination request for the subscriber */
static void subscr_conn_shutdown_event(int conid, void *usr_data)
static void tipc_subscrb_shutdown_cb(int conid, void *usr_data)
{
subscr_release((struct tipc_subscriber *)usr_data);
tipc_subscrb_delete((struct tipc_subscriber *)usr_data);
}
/* Handle one request to create a new subscription for the subscriber */
static void subscr_conn_msg_event(struct net *net, int conid,
static void tipc_subscrb_rcv_cb(struct net *net, int conid,
struct sockaddr_tipc *addr, void *usr_data,
void *buf, size_t len)
{
......@@ -301,34 +293,20 @@ static void subscr_conn_msg_event(struct net *net, int conid,
struct tipc_subscription *sub = NULL;
struct tipc_net *tn = net_generic(net, tipc_net_id);
spin_lock_bh(&subscriber->lock);
subscr_subscribe(net, (struct tipc_subscr *)buf, subscriber, &sub);
tipc_subscrp_create(net, (struct tipc_subscr *)buf, subscriber, &sub);
if (sub)
tipc_nametbl_subscribe(sub);
else
tipc_conn_terminate(tn->topsrv, subscriber->conid);
spin_unlock_bh(&subscriber->lock);
}
/* Handle one request to establish a new subscriber */
static void *subscr_named_msg_event(int conid)
static void *tipc_subscrb_connect_cb(int conid)
{
struct tipc_subscriber *subscriber;
/* Create subscriber object */
subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
if (subscriber == NULL) {
pr_warn("Subscriber rejected, no memory\n");
return NULL;
}
INIT_LIST_HEAD(&subscriber->subscription_list);
subscriber->conid = conid;
spin_lock_init(&subscriber->lock);
return (void *)subscriber;
return (void *)tipc_subscrb_create(conid);
}
int tipc_subscr_start(struct net *net)
int tipc_topsrv_start(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
const char name[] = "topology_server";
......@@ -355,9 +333,9 @@ int tipc_subscr_start(struct net *net)
topsrv->imp = TIPC_CRITICAL_IMPORTANCE;
topsrv->type = SOCK_SEQPACKET;
topsrv->max_rcvbuf_size = sizeof(struct tipc_subscr);
topsrv->tipc_conn_recvmsg = subscr_conn_msg_event;
topsrv->tipc_conn_new = subscr_named_msg_event;
topsrv->tipc_conn_shutdown = subscr_conn_shutdown_event;
topsrv->tipc_conn_recvmsg = tipc_subscrb_rcv_cb;
topsrv->tipc_conn_new = tipc_subscrb_connect_cb;
topsrv->tipc_conn_shutdown = tipc_subscrb_shutdown_cb;
strncpy(topsrv->name, name, strlen(name) + 1);
tn->topsrv = topsrv;
......@@ -366,7 +344,7 @@ int tipc_subscr_start(struct net *net)
return tipc_server_start(topsrv);
}
void tipc_subscr_stop(struct net *net)
void tipc_topsrv_stop(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_server *topsrv = tn->topsrv;
......
......@@ -54,7 +54,7 @@ struct tipc_subscriber;
* @filter: event filtering to be done for subscription
* @timer: timer governing subscription duration (optional)
* @nameseq_list: adjacent subscriptions in name sequence's subscription list
* @subscription_list: adjacent subscriptions in subscriber's subscription list
* @subscrp_list: adjacent subscriptions in subscriber's subscription list
* @server_ref: object reference of server port associated with subscription
* @swap: indicates if subscriber uses opposite endianness in its messages
* @evt: template for events generated by subscription
......@@ -67,17 +67,17 @@ struct tipc_subscription {
u32 filter;
struct timer_list timer;
struct list_head nameseq_list;
struct list_head subscription_list;
struct list_head subscrp_list;
int swap;
struct tipc_event evt;
};
int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
int tipc_subscrp_check_overlap(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper);
void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
u32 found_upper, u32 event, u32 port_ref,
u32 node, int must);
int tipc_subscr_start(struct net *net);
void tipc_subscr_stop(struct net *net);
void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
u32 found_lower, u32 found_upper, u32 event,
u32 port_ref, u32 node, int must);
int tipc_topsrv_start(struct net *net);
void tipc_topsrv_stop(struct net *net);
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment