Commit df79d040 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: eliminate struct tipc_subscriber

It is unnecessary to keep two structures, struct tipc_conn and struct
tipc_subscriber, with a one-to-one relationship and still with different
life cycles. The fact that the two often run in different contexts, and
still may access each other via direct pointers constitutes an additional
hazard, something we have experienced at several occasions, and still
see happening.

We have identified at least two remaining problems that are easier to
fix if we simplify the topology server data structure somewhat.

- When there is a race between a subscription up/down event and a
  timeout event, it is fully possible that the former might be delivered
  after the latter, leading to confusion for the receiver.

- The function tipc_subcrp_timeout() is executing in interrupt context,
  while the following call chain is at least theoretically possible:
  tipc_subscrp_timeout()
    tipc_subscrp_send_event()
      tipc_conn_sendmsg()
        conn_put()
          tipc_conn_kref_release()
            sock_release(sock)

I.e., we end up calling a function that might try to sleep in
interrupt context. To eliminate this, we need to ensure that the
tipc_conn structure and the socket, as well as the subscription
instances, only are deleted in work queue context, i.e., after the
timeout event really has been sent out.

We now remove this unnecessary complexity, by merging data and
functionality of the subscriber structure into struct tipc_conn
and the associated file server.c. We thereafter add a spinlock and
a new 'inactive' state to the subscription structure. Using those,
both problems described above can be easily solved.
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent c901d26d
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
* net/tipc/server.c: TIPC server infrastructure * net/tipc/server.c: TIPC server infrastructure
* *
* Copyright (c) 2012-2013, Wind River Systems * Copyright (c) 2012-2013, Wind River Systems
* Copyright (c) 2017, Ericsson AB
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -57,12 +58,13 @@ ...@@ -57,12 +58,13 @@
* @sock: socket handler associated with connection * @sock: socket handler associated with connection
* @flags: indicates connection state * @flags: indicates connection state
* @server: pointer to connected server * @server: pointer to connected server
* @sub_list: lsit to all pertaing subscriptions
* @sub_lock: lock protecting the subscription list
* @outqueue_lock: control access to the outqueue
* @rwork: receive work item * @rwork: receive work item
* @usr_data: user-specified field
* @rx_action: what to do when connection socket is active * @rx_action: what to do when connection socket is active
* @outqueue: pointer to first outbound message in queue * @outqueue: pointer to first outbound message in queue
* @outqueue_lock: control access to the outqueue * @outqueue_lock: control access to the outqueue
* @outqueue: list of connection objects for its server
* @swork: send work item * @swork: send work item
*/ */
struct tipc_conn { struct tipc_conn {
...@@ -71,9 +73,10 @@ struct tipc_conn { ...@@ -71,9 +73,10 @@ struct tipc_conn {
struct socket *sock; struct socket *sock;
unsigned long flags; unsigned long flags;
struct tipc_server *server; struct tipc_server *server;
struct list_head sub_list;
spinlock_t sub_lock; /* for subscription list */
struct work_struct rwork; struct work_struct rwork;
int (*rx_action) (struct tipc_conn *con); int (*rx_action) (struct tipc_conn *con);
void *usr_data;
struct list_head outqueue; struct list_head outqueue;
spinlock_t outqueue_lock; spinlock_t outqueue_lock;
struct work_struct swork; struct work_struct swork;
...@@ -81,6 +84,7 @@ struct tipc_conn { ...@@ -81,6 +84,7 @@ struct tipc_conn {
/* An entry waiting to be sent */ /* An entry waiting to be sent */
struct outqueue_entry { struct outqueue_entry {
u32 evt;
struct list_head list; struct list_head list;
struct kvec iov; struct kvec iov;
}; };
...@@ -89,18 +93,33 @@ static void tipc_recv_work(struct work_struct *work); ...@@ -89,18 +93,33 @@ static void tipc_recv_work(struct work_struct *work);
static void tipc_send_work(struct work_struct *work); static void tipc_send_work(struct work_struct *work);
static void tipc_clean_outqueues(struct tipc_conn *con); static void tipc_clean_outqueues(struct tipc_conn *con);
static bool connected(struct tipc_conn *con)
{
return con && test_bit(CF_CONNECTED, &con->flags);
}
/**
* htohl - convert value to endianness used by destination
* @in: value to convert
* @swap: non-zero if endianness must be reversed
*
* Returns converted value
*/
static u32 htohl(u32 in, int swap)
{
return swap ? swab32(in) : in;
}
static void tipc_conn_kref_release(struct kref *kref) static void tipc_conn_kref_release(struct kref *kref)
{ {
struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
struct tipc_server *s = con->server; struct tipc_server *s = con->server;
struct socket *sock = con->sock; struct socket *sock = con->sock;
struct sock *sk;
if (sock) { if (sock) {
sk = sock->sk;
if (test_bit(CF_SERVER, &con->flags)) { if (test_bit(CF_SERVER, &con->flags)) {
__module_get(sock->ops->owner); __module_get(sock->ops->owner);
__module_get(sk->sk_prot_creator->owner); __module_get(sock->sk->sk_prot_creator->owner);
} }
sock_release(sock); sock_release(sock);
con->sock = NULL; con->sock = NULL;
...@@ -129,11 +148,8 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid) ...@@ -129,11 +148,8 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
spin_lock_bh(&s->idr_lock); spin_lock_bh(&s->idr_lock);
con = idr_find(&s->conn_idr, conid); con = idr_find(&s->conn_idr, conid);
if (con) { if (!connected(con) || !kref_get_unless_zero(&con->kref))
if (!test_bit(CF_CONNECTED, &con->flags) ||
!kref_get_unless_zero(&con->kref))
con = NULL; con = NULL;
}
spin_unlock_bh(&s->idr_lock); spin_unlock_bh(&s->idr_lock);
return con; return con;
} }
...@@ -144,7 +160,7 @@ static void sock_data_ready(struct sock *sk) ...@@ -144,7 +160,7 @@ static void sock_data_ready(struct sock *sk)
read_lock_bh(&sk->sk_callback_lock); read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk); con = sock2con(sk);
if (con && test_bit(CF_CONNECTED, &con->flags)) { if (connected(con)) {
conn_get(con); conn_get(con);
if (!queue_work(con->server->rcv_wq, &con->rwork)) if (!queue_work(con->server->rcv_wq, &con->rwork))
conn_put(con); conn_put(con);
...@@ -158,7 +174,7 @@ static void sock_write_space(struct sock *sk) ...@@ -158,7 +174,7 @@ static void sock_write_space(struct sock *sk)
read_lock_bh(&sk->sk_callback_lock); read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk); con = sock2con(sk);
if (con && test_bit(CF_CONNECTED, &con->flags)) { if (connected(con)) {
conn_get(con); conn_get(con);
if (!queue_work(con->server->send_wq, &con->swork)) if (!queue_work(con->server->send_wq, &con->swork))
conn_put(con); conn_put(con);
...@@ -181,6 +197,24 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con) ...@@ -181,6 +197,24 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
} }
/* tipc_con_delete_sub - delete a specific or all subscriptions
* for a given subscriber
*/
static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
{
struct list_head *sub_list = &con->sub_list;
struct tipc_subscription *sub, *tmp;
spin_lock_bh(&con->sub_lock);
list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) {
if (!s || !memcmp(s, &sub->evt.s, sizeof(*s)))
tipc_sub_delete(sub);
else if (s)
break;
}
spin_unlock_bh(&con->sub_lock);
}
static void tipc_close_conn(struct tipc_conn *con) static void tipc_close_conn(struct tipc_conn *con)
{ {
struct sock *sk = con->sock->sk; struct sock *sk = con->sock->sk;
...@@ -188,10 +222,11 @@ static void tipc_close_conn(struct tipc_conn *con) ...@@ -188,10 +222,11 @@ static void tipc_close_conn(struct tipc_conn *con)
write_lock_bh(&sk->sk_callback_lock); write_lock_bh(&sk->sk_callback_lock);
disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags); disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
if (disconnect) { if (disconnect) {
sk->sk_user_data = NULL; sk->sk_user_data = NULL;
if (con->conid) if (con->conid)
tipc_subscrb_delete(con->usr_data); tipc_con_delete_sub(con, NULL);
} }
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
...@@ -215,7 +250,9 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) ...@@ -215,7 +250,9 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
kref_init(&con->kref); kref_init(&con->kref);
INIT_LIST_HEAD(&con->outqueue); INIT_LIST_HEAD(&con->outqueue);
INIT_LIST_HEAD(&con->sub_list);
spin_lock_init(&con->outqueue_lock); spin_lock_init(&con->outqueue_lock);
spin_lock_init(&con->sub_lock);
INIT_WORK(&con->swork, tipc_send_work); INIT_WORK(&con->swork, tipc_send_work);
INIT_WORK(&con->rwork, tipc_recv_work); INIT_WORK(&con->rwork, tipc_recv_work);
...@@ -236,6 +273,35 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) ...@@ -236,6 +273,35 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
return con; return con;
} }
int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
void *buf, size_t len)
{
struct tipc_subscr *s = (struct tipc_subscr *)buf;
struct tipc_subscription *sub;
bool status;
int swap;
/* Determine subscriber's endianness */
swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
TIPC_SUB_CANCEL));
/* Detect & process a subscription cancellation request */
if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
tipc_con_delete_sub(con, s);
return 0;
}
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
if (!sub)
return -1;
spin_lock_bh(&con->sub_lock);
list_add(&sub->subscrp_list, &con->sub_list);
spin_unlock_bh(&con->sub_lock);
return 0;
}
static int tipc_receive_from_sock(struct tipc_conn *con) static int tipc_receive_from_sock(struct tipc_conn *con)
{ {
struct tipc_server *s = con->server; struct tipc_server *s = con->server;
...@@ -262,9 +328,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con) ...@@ -262,9 +328,7 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
} }
read_lock_bh(&sk->sk_callback_lock); read_lock_bh(&sk->sk_callback_lock);
if (test_bit(CF_CONNECTED, &con->flags)) ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
ret = tipc_subscrb_rcv(sock_net(con->sock->sk), con->conid,
con->usr_data, buf, ret);
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
kmem_cache_free(s->rcvbuf_cache, buf); kmem_cache_free(s->rcvbuf_cache, buf);
if (ret < 0) if (ret < 0)
...@@ -302,15 +366,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con) ...@@ -302,15 +366,6 @@ static int tipc_accept_from_sock(struct tipc_conn *con)
newcon->rx_action = tipc_receive_from_sock; newcon->rx_action = tipc_receive_from_sock;
tipc_register_callbacks(newsock, newcon); tipc_register_callbacks(newsock, newcon);
/* Notify that new connection is incoming */
newcon->usr_data = tipc_subscrb_create(newcon->conid);
if (!newcon->usr_data) {
sock_release(newsock);
conn_put(newcon);
return -ENOMEM;
}
/* Wake up receive process in case of 'SYN+' message */ /* Wake up receive process in case of 'SYN+' message */
newsock->sk->sk_data_ready(newsock->sk); newsock->sk->sk_data_ready(newsock->sk);
return ret; return ret;
...@@ -427,7 +482,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con) ...@@ -427,7 +482,7 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
} }
int tipc_conn_sendmsg(struct tipc_server *s, int conid, int tipc_conn_sendmsg(struct tipc_server *s, int conid,
void *data, size_t len) u32 evt, void *data, size_t len)
{ {
struct outqueue_entry *e; struct outqueue_entry *e;
struct tipc_conn *con; struct tipc_conn *con;
...@@ -436,7 +491,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, ...@@ -436,7 +491,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
if (!con) if (!con)
return -EINVAL; return -EINVAL;
if (!test_bit(CF_CONNECTED, &con->flags)) { if (!connected(con)) {
conn_put(con); conn_put(con);
return 0; return 0;
} }
...@@ -446,7 +501,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, ...@@ -446,7 +501,7 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,
conn_put(con); conn_put(con);
return -ENOMEM; return -ENOMEM;
} }
e->evt = evt;
spin_lock_bh(&con->outqueue_lock); spin_lock_bh(&con->outqueue_lock);
list_add_tail(&e->list, &con->outqueue); list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock); spin_unlock_bh(&con->outqueue_lock);
...@@ -470,10 +525,9 @@ void tipc_conn_terminate(struct tipc_server *s, int conid) ...@@ -470,10 +525,9 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid) u32 upper, u32 filter, int *conid)
{ {
struct tipc_subscriber *scbr;
struct tipc_subscr sub; struct tipc_subscr sub;
struct tipc_server *s;
struct tipc_conn *con; struct tipc_conn *con;
int rc;
sub.seq.type = type; sub.seq.type = type;
sub.seq.lower = lower; sub.seq.lower = lower;
...@@ -487,32 +541,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, ...@@ -487,32 +541,23 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
return false; return false;
*conid = con->conid; *conid = con->conid;
s = con->server;
scbr = tipc_subscrb_create(*conid);
if (!scbr) {
conn_put(con);
return false;
}
con->usr_data = scbr;
con->sock = NULL; con->sock = NULL;
tipc_subscrb_rcv(net, *conid, scbr, &sub, sizeof(sub)); rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
return true; if (rc < 0)
tipc_close_conn(con);
return !rc;
} }
void tipc_topsrv_kern_unsubscr(struct net *net, int conid) void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
{ {
struct tipc_conn *con; struct tipc_conn *con;
struct tipc_server *srv;
con = tipc_conn_lookup(tipc_topsrv(net), conid); con = tipc_conn_lookup(tipc_topsrv(net), conid);
if (!con) if (!con)
return; return;
test_and_clear_bit(CF_CONNECTED, &con->flags); test_and_clear_bit(CF_CONNECTED, &con->flags);
srv = con->server; tipc_con_delete_sub(con, NULL);
if (con->conid)
tipc_subscrb_delete(con->usr_data);
conn_put(con); conn_put(con);
conn_put(con); conn_put(con);
} }
...@@ -537,7 +582,8 @@ static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt) ...@@ -537,7 +582,8 @@ static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
static void tipc_send_to_sock(struct tipc_conn *con) static void tipc_send_to_sock(struct tipc_conn *con)
{ {
struct tipc_server *s = con->server; struct list_head *queue = &con->outqueue;
struct tipc_server *srv = con->server;
struct outqueue_entry *e; struct outqueue_entry *e;
struct tipc_event *evt; struct tipc_event *evt;
struct msghdr msg; struct msghdr msg;
...@@ -545,16 +591,20 @@ static void tipc_send_to_sock(struct tipc_conn *con) ...@@ -545,16 +591,20 @@ static void tipc_send_to_sock(struct tipc_conn *con)
int ret; int ret;
spin_lock_bh(&con->outqueue_lock); spin_lock_bh(&con->outqueue_lock);
while (test_bit(CF_CONNECTED, &con->flags)) {
e = list_entry(con->outqueue.next, struct outqueue_entry, list); while (!list_empty(queue)) {
if ((struct list_head *) e == &con->outqueue) e = list_first_entry(queue, struct outqueue_entry, list);
break;
spin_unlock_bh(&con->outqueue_lock); spin_unlock_bh(&con->outqueue_lock);
if (con->sock) { if (e->evt == TIPC_SUBSCR_TIMEOUT) {
evt = (struct tipc_event *)e->iov.iov_base;
tipc_con_delete_sub(con, &evt->s);
}
memset(&msg, 0, sizeof(msg)); memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT; msg.msg_flags = MSG_DONTWAIT;
if (con->sock) {
ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1, ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
e->iov.iov_len); e->iov.iov_len);
if (ret == -EWOULDBLOCK || ret == 0) { if (ret == -EWOULDBLOCK || ret == 0) {
...@@ -565,7 +615,7 @@ static void tipc_send_to_sock(struct tipc_conn *con) ...@@ -565,7 +615,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
} }
} else { } else {
evt = e->iov.iov_base; evt = e->iov.iov_base;
tipc_send_kern_top_evt(s->net, evt); tipc_send_kern_top_evt(srv->net, evt);
} }
/* Don't starve users filling buffers */ /* Don't starve users filling buffers */
...@@ -573,7 +623,6 @@ static void tipc_send_to_sock(struct tipc_conn *con) ...@@ -573,7 +623,6 @@ static void tipc_send_to_sock(struct tipc_conn *con)
cond_resched(); cond_resched();
count = 0; count = 0;
} }
spin_lock_bh(&con->outqueue_lock); spin_lock_bh(&con->outqueue_lock);
list_del(&e->list); list_del(&e->list);
tipc_free_entry(e); tipc_free_entry(e);
...@@ -591,7 +640,7 @@ static void tipc_recv_work(struct work_struct *work) ...@@ -591,7 +640,7 @@ static void tipc_recv_work(struct work_struct *work)
struct tipc_conn *con = container_of(work, struct tipc_conn, rwork); struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
int count = 0; int count = 0;
while (test_bit(CF_CONNECTED, &con->flags)) { while (connected(con)) {
if (con->rx_action(con)) if (con->rx_action(con))
break; break;
...@@ -608,7 +657,7 @@ static void tipc_send_work(struct work_struct *work) ...@@ -608,7 +657,7 @@ static void tipc_send_work(struct work_struct *work)
{ {
struct tipc_conn *con = container_of(work, struct tipc_conn, swork); struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
if (test_bit(CF_CONNECTED, &con->flags)) if (connected(con))
tipc_send_to_sock(con); tipc_send_to_sock(con);
conn_put(con); conn_put(con);
......
...@@ -77,7 +77,7 @@ struct tipc_server { ...@@ -77,7 +77,7 @@ struct tipc_server {
}; };
int tipc_conn_sendmsg(struct tipc_server *s, int conid, int tipc_conn_sendmsg(struct tipc_server *s, int conid,
void *data, size_t len); u32 evt, void *data, size_t len);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower, bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid); u32 upper, u32 filter, int *conid);
......
/* /*
* net/tipc/subscr.c: TIPC network topology service * net/tipc/subscr.c: TIPC network topology service
* *
* Copyright (c) 2000-2006, Ericsson AB * Copyright (c) 2000-2017, Ericsson AB
* Copyright (c) 2005-2007, 2010-2013, Wind River Systems * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -38,22 +38,6 @@ ...@@ -38,22 +38,6 @@
#include "name_table.h" #include "name_table.h"
#include "subscr.h" #include "subscr.h"
/**
* struct tipc_subscriber - TIPC network topology subscriber
* @kref: reference counter to tipc_subscription object
* @conid: connection identifier to server connecting to subscriber
* @lock: control access to subscriber
* @subscrp_list: list of subscription objects for this subscriber
*/
struct tipc_subscriber {
struct kref kref;
int conid;
spinlock_t lock;
struct list_head subscrp_list;
};
static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
/** /**
* htohl - convert value to endianness used by destination * htohl - convert value to endianness used by destination
* @in: value to convert * @in: value to convert
...@@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, ...@@ -71,9 +55,10 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub,
u32 event, u32 port_ref, u32 node) u32 event, u32 port_ref, u32 node)
{ {
struct tipc_net *tn = net_generic(sub->net, tipc_net_id); struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
struct tipc_subscriber *subscriber = sub->subscriber;
struct kvec msg_sect; struct kvec msg_sect;
if (sub->inactive)
return;
msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_base = (void *)&sub->evt;
msg_sect.iov_len = sizeof(struct tipc_event); msg_sect.iov_len = sizeof(struct tipc_event);
sub->evt.event = htohl(event, sub->swap); sub->evt.event = htohl(event, sub->swap);
...@@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub, ...@@ -81,7 +66,7 @@ static void tipc_subscrp_send_event(struct tipc_subscription *sub,
sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.found_upper = htohl(found_upper, sub->swap);
sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap);
sub->evt.port.node = htohl(node, sub->swap); sub->evt.port.node = htohl(node, sub->swap);
tipc_conn_sendmsg(tn->topsrv, subscriber->conid, tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
msg_sect.iov_base, msg_sect.iov_len); msg_sect.iov_base, msg_sect.iov_len);
} }
...@@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, ...@@ -132,41 +117,22 @@ void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
return; return;
if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE) if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
return; return;
spin_lock(&sub->lock);
tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, tipc_subscrp_send_event(sub, found_lower, found_upper,
node); event, port_ref, node);
spin_unlock(&sub->lock);
} }
static void tipc_subscrp_timeout(struct timer_list *t) static void tipc_subscrp_timeout(struct timer_list *t)
{ {
struct tipc_subscription *sub = from_timer(sub, t, timer); struct tipc_subscription *sub = from_timer(sub, t, timer);
struct tipc_subscriber *subscriber = sub->subscriber; struct tipc_subscr *s = &sub->evt.s;
spin_lock_bh(&subscriber->lock); spin_lock(&sub->lock);
tipc_nametbl_unsubscribe(sub); tipc_subscrp_send_event(sub, s->seq.lower, s->seq.upper,
list_del(&sub->subscrp_list);
spin_unlock_bh(&subscriber->lock);
/* Notify subscriber of timeout */
tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
TIPC_SUBSCR_TIMEOUT, 0, 0); TIPC_SUBSCR_TIMEOUT, 0, 0);
sub->inactive = true;
tipc_subscrp_put(sub); spin_unlock(&sub->lock);
}
static void tipc_subscrb_kref_release(struct kref *kref)
{
kfree(container_of(kref,struct tipc_subscriber, kref));
}
static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
{
kref_put(&subscriber->kref, tipc_subscrb_kref_release);
}
static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
{
kref_get(&subscriber->kref);
} }
static void tipc_subscrp_kref_release(struct kref *kref) static void tipc_subscrp_kref_release(struct kref *kref)
...@@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref) ...@@ -175,11 +141,9 @@ static void tipc_subscrp_kref_release(struct kref *kref)
struct tipc_subscription, struct tipc_subscription,
kref); kref);
struct tipc_net *tn = net_generic(sub->net, tipc_net_id); struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
struct tipc_subscriber *subscriber = sub->subscriber;
atomic_dec(&tn->subscription_count); atomic_dec(&tn->subscription_count);
kfree(sub); kfree(sub);
tipc_subscrb_put(subscriber);
} }
void tipc_subscrp_put(struct tipc_subscription *subscription) void tipc_subscrp_put(struct tipc_subscription *subscription)
...@@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription *subscription) ...@@ -192,68 +156,9 @@ void tipc_subscrp_get(struct tipc_subscription *subscription)
kref_get(&subscription->kref); kref_get(&subscription->kref);
} }
/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
* subscriptions for a given subscriber.
*/
static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
struct tipc_subscr *s)
{
struct list_head *subscription_list = &subscriber->subscrp_list;
struct tipc_subscription *sub, *temp;
u32 timeout;
spin_lock_bh(&subscriber->lock);
list_for_each_entry_safe(sub, temp, subscription_list, subscrp_list) {
if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
continue;
timeout = htohl(sub->evt.s.timeout, sub->swap);
if (timeout == TIPC_WAIT_FOREVER || del_timer(&sub->timer)) {
tipc_nametbl_unsubscribe(sub);
list_del(&sub->subscrp_list);
tipc_subscrp_put(sub);
}
if (s)
break;
}
spin_unlock_bh(&subscriber->lock);
}
struct tipc_subscriber *tipc_subscrb_create(int conid)
{
struct tipc_subscriber *subscriber;
subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
if (!subscriber) {
pr_warn("Subscriber rejected, no memory\n");
return NULL;
}
INIT_LIST_HEAD(&subscriber->subscrp_list);
kref_init(&subscriber->kref);
subscriber->conid = conid;
spin_lock_init(&subscriber->lock);
return subscriber;
}
void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
{
tipc_subscrb_subscrp_delete(subscriber, NULL);
tipc_subscrb_put(subscriber);
}
static void tipc_subscrp_cancel(struct tipc_subscr *s,
struct tipc_subscriber *subscriber)
{
tipc_subscrb_get(subscriber);
tipc_subscrb_subscrp_delete(subscriber, s);
tipc_subscrb_put(subscriber);
}
static struct tipc_subscription *tipc_subscrp_create(struct net *net, static struct tipc_subscription *tipc_subscrp_create(struct net *net,
struct tipc_subscr *s, struct tipc_subscr *s,
int swap) int conid, bool swap)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_subscription *sub; struct tipc_subscription *sub;
...@@ -275,6 +180,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, ...@@ -275,6 +180,8 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
/* Initialize subscription object */ /* Initialize subscription object */
sub->net = net; sub->net = net;
sub->conid = conid;
sub->inactive = false;
if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) || if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
(htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) { (htohl(s->seq.lower, swap) > htohl(s->seq.upper, swap))) {
pr_warn("Subscription rejected, illegal request\n"); pr_warn("Subscription rejected, illegal request\n");
...@@ -284,59 +191,39 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net, ...@@ -284,59 +191,39 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
sub->swap = swap; sub->swap = swap;
memcpy(&sub->evt.s, s, sizeof(*s)); memcpy(&sub->evt.s, s, sizeof(*s));
spin_lock_init(&sub->lock);
atomic_inc(&tn->subscription_count); atomic_inc(&tn->subscription_count);
kref_init(&sub->kref); kref_init(&sub->kref);
return sub; return sub;
} }
static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
struct tipc_subscriber *subscriber, int swap, struct tipc_subscr *s,
int conid, bool swap,
bool status) bool status)
{ {
struct tipc_subscription *sub = NULL; struct tipc_subscription *sub = NULL;
u32 timeout; u32 timeout;
sub = tipc_subscrp_create(net, s, swap); sub = tipc_subscrp_create(net, s, conid, swap);
if (!sub) if (!sub)
return -1; return NULL;
spin_lock_bh(&subscriber->lock);
list_add(&sub->subscrp_list, &subscriber->subscrp_list);
sub->subscriber = subscriber;
tipc_nametbl_subscribe(sub, status); tipc_nametbl_subscribe(sub, status);
tipc_subscrb_get(subscriber);
spin_unlock_bh(&subscriber->lock);
timer_setup(&sub->timer, tipc_subscrp_timeout, 0); timer_setup(&sub->timer, tipc_subscrp_timeout, 0);
timeout = htohl(sub->evt.s.timeout, swap); timeout = htohl(sub->evt.s.timeout, swap);
if (timeout != TIPC_WAIT_FOREVER) if (timeout != TIPC_WAIT_FOREVER)
mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
return 0; return sub;
} }
/* Handle one request to create a new subscription for the subscriber void tipc_sub_delete(struct tipc_subscription *sub)
*/
int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data,
void *buf, size_t len)
{ {
struct tipc_subscriber *subscriber = usr_data; tipc_nametbl_unsubscribe(sub);
struct tipc_subscr *s = (struct tipc_subscr *)buf; if (sub->evt.s.timeout != TIPC_WAIT_FOREVER)
bool status; del_timer_sync(&sub->timer);
int swap; list_del(&sub->subscrp_list);
tipc_subscrp_put(sub);
/* Determine subscriber's endianness */
swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE |
TIPC_SUB_CANCEL));
/* Detect & process a subscription cancellation request */
if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
tipc_subscrp_cancel(s, subscriber);
return 0;
}
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
} }
int tipc_topsrv_start(struct net *net) int tipc_topsrv_start(struct net *net)
......
...@@ -43,7 +43,7 @@ ...@@ -43,7 +43,7 @@
#define TIPC_MAX_PUBLICATIONS 65535 #define TIPC_MAX_PUBLICATIONS 65535
struct tipc_subscription; struct tipc_subscription;
struct tipc_subscriber; struct tipc_conn;
/** /**
* struct tipc_subscription - TIPC network topology subscription object * struct tipc_subscription - TIPC network topology subscription object
...@@ -58,19 +58,22 @@ struct tipc_subscriber; ...@@ -58,19 +58,22 @@ struct tipc_subscriber;
*/ */
struct tipc_subscription { struct tipc_subscription {
struct kref kref; struct kref kref;
struct tipc_subscriber *subscriber;
struct net *net; struct net *net;
struct timer_list timer; struct timer_list timer;
struct list_head nameseq_list; struct list_head nameseq_list;
struct list_head subscrp_list; struct list_head subscrp_list;
int swap;
struct tipc_event evt; struct tipc_event evt;
int conid;
bool swap;
bool inactive;
spinlock_t lock; /* serialize up/down and timer events */
}; };
struct tipc_subscriber *tipc_subscrb_create(int conid); struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
void tipc_subscrb_delete(struct tipc_subscriber *subscriber); struct tipc_subscr *s,
int tipc_subscrb_rcv(struct net *net, int conid, void *usr_data, int conid, bool swap,
void *buf, size_t len); bool status);
void tipc_sub_delete(struct tipc_subscription *sub);
int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower, int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
u32 found_upper); u32 found_upper);
void tipc_subscrp_report_overlap(struct tipc_subscription *sub, void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment