Commit 414574a0 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: simplify interaction between subscription and topology connection

The message transmission and reception in the topology server is more
generic than is currently necessary. By basing the funtionality on the
fact that we only send items of type struct tipc_event and always
receive items of struct tipc_subcr we can make several simplifications,
and also get rid of some unnecessary dynamic memory allocations.
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent df79d040
......@@ -810,14 +810,15 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
*/
void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
{
struct tipc_net *tn = net_generic(s->net, tipc_net_id);
struct tipc_server *srv = s->server;
struct tipc_net *tn = tipc_net(srv->net);
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
int index = hash(type);
struct name_seq *seq;
struct tipc_name_seq ns;
spin_lock_bh(&tn->nametbl_lock);
seq = nametbl_find_seq(s->net, type);
seq = nametbl_find_seq(srv->net, type);
if (!seq)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
if (seq) {
......@@ -837,12 +838,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
*/
void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
{
struct tipc_net *tn = net_generic(s->net, tipc_net_id);
struct tipc_server *srv = s->server;
struct tipc_net *tn = tipc_net(srv->net);
struct name_seq *seq;
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
spin_lock_bh(&tn->nametbl_lock);
seq = nametbl_find_seq(s->net, type);
seq = nametbl_find_seq(srv->net, type);
if (seq != NULL) {
spin_lock_bh(&seq->lock);
list_del_init(&s->nameseq_list);
......
......@@ -84,9 +84,9 @@ struct tipc_conn {
/* An entry waiting to be sent */
struct outqueue_entry {
u32 evt;
bool inactive;
struct tipc_event evt;
struct list_head list;
struct kvec iov;
};
static void tipc_recv_work(struct work_struct *work);
......@@ -154,6 +154,9 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
return con;
}
/* sock_data_ready - interrupt callback indicating the socket has data to read
* The queued job is launched in tipc_recv_from_sock()
*/
static void sock_data_ready(struct sock *sk)
{
struct tipc_conn *con;
......@@ -168,6 +171,10 @@ static void sock_data_ready(struct sock *sk)
read_unlock_bh(&sk->sk_callback_lock);
}
/* sock_write_space - interrupt callback after a sendmsg EAGAIN
* Indicates that there now is more is space in the send buffer
* The queued job is launched in tipc_send_to_sock()
*/
static void sock_write_space(struct sock *sk)
{
struct tipc_conn *con;
......@@ -273,10 +280,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
return con;
}
int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
void *buf, size_t len)
static int tipc_con_rcv_sub(struct tipc_server *srv,
struct tipc_conn *con,
struct tipc_subscr *s)
{
struct tipc_subscr *s = (struct tipc_subscr *)buf;
struct tipc_subscription *sub;
bool status;
int swap;
......@@ -292,7 +299,7 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
return 0;
}
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status);
if (!sub)
return -1;
......@@ -304,43 +311,27 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
static int tipc_receive_from_sock(struct tipc_conn *con)
{
struct tipc_server *s = con->server;
struct tipc_server *srv = con->server;
struct sock *sk = con->sock->sk;
struct msghdr msg = {};
struct tipc_subscr s;
struct kvec iov;
void *buf;
int ret;
buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
if (!buf) {
ret = -ENOMEM;
goto out_close;
}
iov.iov_base = buf;
iov.iov_len = s->max_rcvbuf_size;
iov.iov_base = &s;
iov.iov_len = sizeof(s);
msg.msg_name = NULL;
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
if (ret <= 0) {
kmem_cache_free(s->rcvbuf_cache, buf);
goto out_close;
}
if (ret == -EWOULDBLOCK)
return -EWOULDBLOCK;
if (ret > 0) {
read_lock_bh(&sk->sk_callback_lock);
ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
ret = tipc_con_rcv_sub(srv, con, &s);
read_unlock_bh(&sk->sk_callback_lock);
kmem_cache_free(s->rcvbuf_cache, buf);
}
if (ret < 0)
tipc_conn_terminate(s, con->conid);
return ret;
out_close:
if (ret != -EWOULDBLOCK)
tipc_close_conn(con);
else if (ret == 0)
/* Don't return success if we really got EOF */
ret = -EAGAIN;
return ret;
}
......@@ -442,33 +433,6 @@ static int tipc_open_listening_sock(struct tipc_server *s)
return 0;
}
static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
{
struct outqueue_entry *entry;
void *buf;
entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
if (!entry)
return NULL;
buf = kmemdup(data, len, GFP_ATOMIC);
if (!buf) {
kfree(entry);
return NULL;
}
entry->iov.iov_base = buf;
entry->iov.iov_len = len;
return entry;
}
static void tipc_free_entry(struct outqueue_entry *e)
{
kfree(e->iov.iov_base);
kfree(e);
}
static void tipc_clean_outqueues(struct tipc_conn *con)
{
struct outqueue_entry *e, *safe;
......@@ -476,50 +440,40 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
spin_lock_bh(&con->outqueue_lock);
list_for_each_entry_safe(e, safe, &con->outqueue, list) {
list_del(&e->list);
tipc_free_entry(e);
kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
}
int tipc_conn_sendmsg(struct tipc_server *s, int conid,
u32 evt, void *data, size_t len)
/* tipc_conn_queue_evt - interrupt level call from a subscription instance
* The queued job is launched in tipc_send_to_sock()
*/
void tipc_conn_queue_evt(struct tipc_server *s, int conid,
u32 event, struct tipc_event *evt)
{
struct outqueue_entry *e;
struct tipc_conn *con;
con = tipc_conn_lookup(s, conid);
if (!con)
return -EINVAL;
return;
if (!connected(con)) {
conn_put(con);
return 0;
}
if (!connected(con))
goto err;
e = tipc_alloc_entry(data, len);
if (!e) {
conn_put(con);
return -ENOMEM;
}
e->evt = evt;
e = kmalloc(sizeof(*e), GFP_ATOMIC);
if (!e)
goto err;
e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
memcpy(&e->evt, evt, sizeof(*evt));
spin_lock_bh(&con->outqueue_lock);
list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock);
if (!queue_work(s->send_wq, &con->swork))
conn_put(con);
return 0;
}
void tipc_conn_terminate(struct tipc_server *s, int conid)
{
struct tipc_conn *con;
con = tipc_conn_lookup(s, conid);
if (con) {
tipc_close_conn(con);
if (queue_work(s->send_wq, &con->swork))
return;
err:
conn_put(con);
}
}
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
......@@ -542,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
*conid = con->conid;
con->sock = NULL;
rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub);
if (rc < 0)
tipc_close_conn(con);
return !rc;
......@@ -587,6 +541,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
struct outqueue_entry *e;
struct tipc_event *evt;
struct msghdr msg;
struct kvec iov;
int count = 0;
int ret;
......@@ -594,27 +549,28 @@ static void tipc_send_to_sock(struct tipc_conn *con)
while (!list_empty(queue)) {
e = list_first_entry(queue, struct outqueue_entry, list);
evt = &e->evt;
spin_unlock_bh(&con->outqueue_lock);
if (e->evt == TIPC_SUBSCR_TIMEOUT) {
evt = (struct tipc_event *)e->iov.iov_base;
if (e->inactive)
tipc_con_delete_sub(con, &evt->s);
}
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
iov.iov_base = evt;
iov.iov_len = sizeof(*evt);
msg.msg_name = NULL;
if (con->sock) {
ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
e->iov.iov_len);
ret = kernel_sendmsg(con->sock, &msg, &iov,
1, sizeof(*evt));
if (ret == -EWOULDBLOCK || ret == 0) {
cond_resched();
goto out;
} else if (ret < 0) {
goto send_err;
goto err;
}
} else {
evt = e->iov.iov_base;
tipc_send_kern_top_evt(srv->net, evt);
}
......@@ -625,13 +581,12 @@ static void tipc_send_to_sock(struct tipc_conn *con)
}
spin_lock_bh(&con->outqueue_lock);
list_del(&e->list);
tipc_free_entry(e);
kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
out:
return;
send_err:
err:
tipc_close_conn(con);
}
......@@ -695,22 +650,14 @@ int tipc_server_start(struct tipc_server *s)
idr_init(&s->conn_idr);
s->idr_in_use = 0;
s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
0, SLAB_HWCACHE_ALIGN, NULL);
if (!s->rcvbuf_cache)
return -ENOMEM;
ret = tipc_work_start(s);
if (ret < 0) {
kmem_cache_destroy(s->rcvbuf_cache);
if (ret < 0)
return ret;
}
ret = tipc_open_listening_sock(s);
if (ret < 0) {
if (ret < 0)
tipc_work_stop(s);
kmem_cache_destroy(s->rcvbuf_cache);
return ret;
}
return ret;
}
......@@ -731,6 +678,5 @@ void tipc_server_stop(struct tipc_server *s)
spin_unlock_bh(&s->idr_lock);
tipc_work_stop(s);
kmem_cache_destroy(s->rcvbuf_cache);
idr_destroy(&s->conn_idr);
}
......@@ -36,6 +36,7 @@
#ifndef _TIPC_SERVER_H
#define _TIPC_SERVER_H
#include "core.h"
#include <linux/idr.h>
#include <linux/tipc.h>
#include <net/net_namespace.h>
......@@ -68,7 +69,6 @@ struct tipc_server {
spinlock_t idr_lock;
int idr_in_use;
struct net *net;
struct kmem_cache *rcvbuf_cache;
struct workqueue_struct *rcv_wq;
struct workqueue_struct *send_wq;
int max_rcvbuf_size;
......@@ -76,19 +76,13 @@ struct tipc_server {
char name[TIPC_SERVER_NAME_LEN];
};
int tipc_conn_sendmsg(struct tipc_server *s, int conid,
u32 evt, void *data, size_t len);
void tipc_conn_queue_evt(struct tipc_server *s, int conid,
u32 event, struct tipc_event *evt);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
/**
* tipc_conn_terminate - terminate connection with server
*
* Note: Must call it in process context since it might sleep
*/
void tipc_conn_terminate(struct tipc_server *s, int conid);
int tipc_server_start(struct tipc_server *s);
void tipc_server_stop(struct tipc_server *s);
......
......@@ -52,22 +52,19 @@ static u32 htohl(u32 in, int swap)
static void tipc_subscrp_send_event(struct tipc_subscription *sub,
u32 found_lower, u32 found_upper,
u32 event, u32 port_ref, u32 node)
u32 event, u32 port, u32 node)
{
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
struct kvec msg_sect;
struct tipc_event *evt = &sub->evt;
bool swap = sub->swap;
if (sub->inactive)
return;
msg_sect.iov_base = (void *)&sub->evt;
msg_sect.iov_len = sizeof(struct tipc_event);
sub->evt.event = htohl(event, sub->swap);
sub->evt.found_lower = htohl(found_lower, sub->swap);
sub->evt.found_upper = htohl(found_upper, sub->swap);
sub->evt.port.ref = htohl(port_ref, sub->swap);
sub->evt.port.node = htohl(node, sub->swap);
tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
msg_sect.iov_base, msg_sect.iov_len);
evt->event = htohl(event, swap);
evt->found_lower = htohl(found_lower, swap);
evt->found_upper = htohl(found_upper, swap);
evt->port.ref = htohl(port, swap);
evt->port.node = htohl(node, swap);
tipc_conn_queue_evt(sub->server, sub->conid, event, evt);
}
/**
......@@ -137,10 +134,11 @@ static void tipc_subscrp_timeout(struct timer_list *t)
static void tipc_subscrp_kref_release(struct kref *kref)
{
struct tipc_subscription *sub = container_of(kref,
struct tipc_subscription,
kref);
struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
struct tipc_subscription *sub;
struct tipc_net *tn;
sub = container_of(kref, struct tipc_subscription, kref);
tn = tipc_net(sub->server->net);
atomic_dec(&tn->subscription_count);
kfree(sub);
......@@ -156,11 +154,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription)
kref_get(&subscription->kref);
}
static struct tipc_subscription *tipc_subscrp_create(struct net *net,
static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_net *tn = tipc_net(srv->net);
struct tipc_subscription *sub;
u32 filter = htohl(s->filter, swap);
......@@ -179,7 +177,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
}
/* Initialize subscription object */
sub->net = net;
sub->server = srv;
sub->conid = conid;
sub->inactive = false;
if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
......@@ -197,7 +195,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
return sub;
}
struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap,
bool status)
......@@ -205,7 +203,7 @@ struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
struct tipc_subscription *sub = NULL;
u32 timeout;
sub = tipc_subscrp_create(net, s, conid, swap);
sub = tipc_subscrp_create(srv, s, conid, swap);
if (!sub)
return NULL;
......
......@@ -49,7 +49,6 @@ struct tipc_conn;
* struct tipc_subscription - TIPC network topology subscription object
* @subscriber: pointer to its subscriber
* @seq: name sequence associated with subscription
* @net: point to network namespace
* @timer: timer governing subscription duration (optional)
* @nameseq_list: adjacent subscriptions in name sequence's subscription list
* @subscrp_list: adjacent subscriptions in subscriber's subscription list
......@@ -58,7 +57,7 @@ struct tipc_conn;
*/
struct tipc_subscription {
struct kref kref;
struct net *net;
struct tipc_server *server;
struct timer_list timer;
struct list_head nameseq_list;
struct list_head subscrp_list;
......@@ -69,7 +68,7 @@ struct tipc_subscription {
spinlock_t lock; /* serialize up/down and timer events */
};
struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap,
bool status);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment