Commit 2312bf61 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: introduce per-link spinlock

As a preparation to allow parallel links to work more independently
from each other we introduce a per-link spinlock, to be stored in the
struct nodes's link entry area. Since the node lock still is a regular
spinlock there is no increase in parallellism at this stage.
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 1d7e1c25
...@@ -1995,6 +1995,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info) ...@@ -1995,6 +1995,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
struct tipc_node *node; struct tipc_node *node;
struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1]; struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
struct net *net = sock_net(skb->sk); struct net *net = sock_net(skb->sk);
struct tipc_link_entry *le;
if (!info->attrs[TIPC_NLA_LINK]) if (!info->attrs[TIPC_NLA_LINK])
return -EINVAL; return -EINVAL;
...@@ -2020,17 +2021,17 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info) ...@@ -2020,17 +2021,17 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
node = tipc_link_find_owner(net, link_name, &bearer_id); node = tipc_link_find_owner(net, link_name, &bearer_id);
if (!node) if (!node)
return -EINVAL; return -EINVAL;
le = &node->links[bearer_id];
tipc_node_lock(node); tipc_node_lock(node);
spin_lock_bh(&le->lock);
link = node->links[bearer_id].link; link = le->link;
if (!link) { if (!link) {
tipc_node_unlock(node); tipc_node_unlock(node);
return -EINVAL; return -EINVAL;
} }
link_reset_statistics(link); link_reset_statistics(link);
spin_unlock_bh(&le->lock);
tipc_node_unlock(node); tipc_node_unlock(node);
return 0; return 0;
......
...@@ -339,11 +339,13 @@ static void tipc_node_timeout(unsigned long data) ...@@ -339,11 +339,13 @@ static void tipc_node_timeout(unsigned long data)
for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
tipc_node_lock(n); tipc_node_lock(n);
le = &n->links[bearer_id]; le = &n->links[bearer_id];
spin_lock_bh(&le->lock);
if (le->link) { if (le->link) {
/* Link tolerance may change asynchronously: */ /* Link tolerance may change asynchronously: */
tipc_node_calculate_timer(n, le->link); tipc_node_calculate_timer(n, le->link);
rc = tipc_link_timeout(le->link, &xmitq); rc = tipc_link_timeout(le->link, &xmitq);
} }
spin_unlock_bh(&le->lock);
tipc_node_unlock(n); tipc_node_unlock(n);
tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
if (rc & TIPC_LINK_DOWN_EVT) if (rc & TIPC_LINK_DOWN_EVT)
...@@ -654,6 +656,7 @@ void tipc_node_check_dest(struct net *net, u32 onode, ...@@ -654,6 +656,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
if (n->state == NODE_FAILINGOVER) if (n->state == NODE_FAILINGOVER)
tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
le->link = l; le->link = l;
spin_lock_init(&le->lock);
n->link_cnt++; n->link_cnt++;
tipc_node_calculate_timer(n, l); tipc_node_calculate_timer(n, l);
if (n->link_cnt == 1) if (n->link_cnt == 1)
...@@ -1033,20 +1036,6 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) ...@@ -1033,20 +1036,6 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
return -EMSGSIZE; return -EMSGSIZE;
} }
static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
int *bearer_id,
struct tipc_media_addr **maddr)
{
int id = n->active_links[sel & 1];
if (unlikely(id < 0))
return NULL;
*bearer_id = id;
*maddr = &n->links[id].maddr;
return n->links[id].link;
}
/** /**
* tipc_node_xmit() is the general link level function for message sending * tipc_node_xmit() is the general link level function for message sending
* @net: the applicable net namespace * @net: the applicable net namespace
...@@ -1059,26 +1048,32 @@ static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel, ...@@ -1059,26 +1048,32 @@ static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
int tipc_node_xmit(struct net *net, struct sk_buff_head *list, int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
u32 dnode, int selector) u32 dnode, int selector)
{ {
struct tipc_link *l = NULL; struct tipc_link_entry *le;
struct tipc_node *n; struct tipc_node *n;
struct sk_buff_head xmitq; struct sk_buff_head xmitq;
struct tipc_media_addr *maddr; struct tipc_media_addr *maddr = NULL;
int bearer_id; int bearer_id = -1;
int rc = -EHOSTUNREACH; int rc = -EHOSTUNREACH;
__skb_queue_head_init(&xmitq); __skb_queue_head_init(&xmitq);
n = tipc_node_find(net, dnode); n = tipc_node_find(net, dnode);
if (likely(n)) { if (likely(n)) {
tipc_node_lock(n); tipc_node_lock(n);
l = tipc_node_select_link(n, selector, &bearer_id, &maddr); bearer_id = n->active_links[selector & 1];
if (likely(l)) if (bearer_id >= 0) {
rc = tipc_link_xmit(l, list, &xmitq); le = &n->links[bearer_id];
maddr = &le->maddr;
spin_lock_bh(&le->lock);
if (likely(le->link))
rc = tipc_link_xmit(le->link, list, &xmitq);
spin_unlock_bh(&le->lock);
}
tipc_node_unlock(n); tipc_node_unlock(n);
if (unlikely(rc == -ENOBUFS)) if (unlikely(rc == -ENOBUFS))
tipc_node_link_down(n, bearer_id, false); tipc_node_link_down(n, bearer_id, false);
tipc_node_put(n); tipc_node_put(n);
} }
if (likely(!rc)) { if (likely(!skb_queue_empty(&xmitq))) {
tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
return 0; return 0;
} }
...@@ -1374,7 +1369,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) ...@@ -1374,7 +1369,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
/* Check and if necessary update node state */ /* Check and if necessary update node state */
if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
spin_lock_bh(&le->lock);
rc = tipc_link_rcv(le->link, skb, &xmitq); rc = tipc_link_rcv(le->link, skb, &xmitq);
spin_unlock_bh(&le->lock);
skb = NULL; skb = NULL;
} }
unlock: unlock:
......
...@@ -69,6 +69,7 @@ enum { ...@@ -69,6 +69,7 @@ enum {
struct tipc_link_entry { struct tipc_link_entry {
struct tipc_link *link; struct tipc_link *link;
spinlock_t lock; /* per-link */
u32 mtu; u32 mtu;
struct sk_buff_head inputq; struct sk_buff_head inputq;
struct tipc_media_addr maddr; struct tipc_media_addr maddr;
...@@ -86,7 +87,7 @@ struct tipc_bclink_entry { ...@@ -86,7 +87,7 @@ struct tipc_bclink_entry {
* struct tipc_node - TIPC node structure * struct tipc_node - TIPC node structure
* @addr: network address of node * @addr: network address of node
* @ref: reference counter to node object * @ref: reference counter to node object
* @lock: spinlock governing access to structure * @lock: rwlock governing access to structure
* @net: the applicable net namespace * @net: the applicable net namespace
* @hash: links to adjacent nodes in unsorted hash chain * @hash: links to adjacent nodes in unsorted hash chain
* @inputq: pointer to input queue containing messages for msg event * @inputq: pointer to input queue containing messages for msg event
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment