Commit 0080d4f5 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-next'

Ying Xue says:

====================
purge tipc_net_lock

Now tipc routing hierarchy comprises the structures 'node', 'link'and
'bearer'. The whole hierarchy is protected by a big read/write lock,
tipc_net_lock, to ensure that nothing is added or removed while code
is accessing any of these structures. Obviously the locking policy
makes node, link and bearer components closely bound together so that
their relationship becomes unnecessarily complex. In the worst case,
such locking policy not only has a negative influence on performance,
but also it's prone to lead to deadlock occasionally.

In order o decouple the complex relationship between bearer and node
as well as link, the locking policy is adjusted as follows:

- Bearer level
  RTNL lock is used on update side, and RCU is used on read side.
  Meanwhile, all bearer instances including broadcast bearer are
  saved into bearer_list array.

- Node and link level
  All node instances are saved into two tipc_node_list and node_htable
  lists. The two lists are protected by node_list_lock on write side,
  and they are guarded with RCU lock on read side. All members in node
  structure including link instances are protected by node spin lock.

- The relationship between bearer and node
  When link accesses bearer, it first needs to find the bearer with
  its bearer identity from the bearer_list array. When bearer accesses
  node, it can iterate the node_htable hash list with the node address
  to find the corresponding node.

In the new locking policy, every component has its private locking
solution and the relationship between bearer and node is very simple,
that is, they can find each other with node address or bearer identity
from node_htable hash list or bearer_list array.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 5a9d19ab a8b9b96e
......@@ -112,6 +112,8 @@ const char tipc_bclink_name[] = "broadcast-link";
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff);
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
static u32 bcbuf_acks(struct sk_buff *buf)
{
......@@ -273,7 +275,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
/**
* tipc_bclink_update_link_state - update broadcast link state
*
* tipc_net_lock and node lock set
* RCU and node lock set
*/
void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
{
......@@ -321,7 +323,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
: n_ptr->bclink.last_sent);
spin_lock_bh(&bc_lock);
tipc_bearer_send(&bcbearer->bearer, buf, NULL);
tipc_bearer_send(MAX_BEARERS, buf, NULL);
bcl->stats.sent_nacks++;
spin_unlock_bh(&bc_lock);
kfree_skb(buf);
......@@ -335,8 +337,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
*
* Delay any upcoming NACK by this node if another node has already
* requested the first message this node is going to ask for.
*
* Only tipc_net_lock set.
*/
static void bclink_peek_nack(struct tipc_msg *msg)
{
......@@ -408,7 +408,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
/**
* tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
*
* tipc_net_lock is read_locked, no other locks set
* RCU is locked, no other locks set
*/
void tipc_bclink_rcv(struct sk_buff *buf)
{
......@@ -627,13 +627,13 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
if (bp_index == 0) {
/* Use original buffer for first bearer */
tipc_bearer_send(b, buf, &b->bcast_addr);
tipc_bearer_send(b->identity, buf, &b->bcast_addr);
} else {
/* Avoid concurrent buffer access */
tbuf = pskb_copy(buf, GFP_ATOMIC);
if (!tbuf)
break;
tipc_bearer_send(b, tbuf, &b->bcast_addr);
tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
kfree_skb(tbuf); /* Bearer keeps a clone */
}
......@@ -655,20 +655,27 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
/**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/
void tipc_bcbearer_sort(void)
void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
{
struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
struct tipc_bcbearer_pair *bp_curr;
struct tipc_bearer *b;
int b_index;
int pri;
spin_lock_bh(&bc_lock);
if (action)
tipc_nmap_add(nm_ptr, node);
else
tipc_nmap_remove(nm_ptr, node);
/* Group bearers by priority (can assume max of two per priority) */
memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
rcu_read_lock();
for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
struct tipc_bearer *b = bearer_list[b_index];
b = rcu_dereference_rtnl(bearer_list[b_index]);
if (!b || !b->nodes.count)
continue;
......@@ -677,6 +684,7 @@ void tipc_bcbearer_sort(void)
else
bp_temp[b->priority].secondary = b;
}
rcu_read_unlock();
/* Create array of bearer pairs for broadcasting */
bp_curr = bcbearer->bpairs;
......@@ -783,8 +791,8 @@ void tipc_bclink_init(void)
bcl->owner = &bclink->node;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
bcl->b_ptr = &bcbearer->bearer;
bearer_list[BCBEARER] = &bcbearer->bearer;
bcl->bearer_id = MAX_BEARERS;
rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
bcl->state = WORKING_WORKING;
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
}
......@@ -795,16 +803,15 @@ void tipc_bclink_stop(void)
tipc_link_purge_queues(bcl);
spin_unlock_bh(&bc_lock);
bearer_list[BCBEARER] = NULL;
RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
memset(bclink, 0, sizeof(*bclink));
memset(bcbearer, 0, sizeof(*bcbearer));
}
/**
* tipc_nmap_add - add a node to a node map
*/
void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
......@@ -819,7 +826,7 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
/**
* tipc_nmap_remove - remove a node from a node map
*/
void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
......
......@@ -69,9 +69,6 @@ struct tipc_node;
extern const char tipc_bclink_name[];
void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
/**
* tipc_nmap_equal - test for equality of node maps
*/
......@@ -98,6 +95,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
int tipc_bclink_stats(char *stats_buf, const u32 buf_size);
int tipc_bclink_reset_stats(void);
int tipc_bclink_set_queue_limits(u32 limit);
void tipc_bcbearer_sort(void);
void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
#endif
......@@ -49,7 +49,7 @@ static struct tipc_media * const media_info_array[] = {
NULL
};
struct tipc_bearer *bearer_list[MAX_BEARERS + 1];
struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1];
static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down);
......@@ -178,7 +178,7 @@ struct tipc_bearer *tipc_bearer_find(const char *name)
u32 i;
for (i = 0; i < MAX_BEARERS; i++) {
b_ptr = bearer_list[i];
b_ptr = rtnl_dereference(bearer_list[i]);
if (b_ptr && (!strcmp(b_ptr->name, name)))
return b_ptr;
}
......@@ -198,10 +198,9 @@ struct sk_buff *tipc_bearer_get_names(void)
if (!buf)
return NULL;
read_lock_bh(&tipc_net_lock);
for (i = 0; media_info_array[i] != NULL; i++) {
for (j = 0; j < MAX_BEARERS; j++) {
b = bearer_list[j];
b = rtnl_dereference(bearer_list[j]);
if (!b)
continue;
if (b->media == media_info_array[i]) {
......@@ -211,22 +210,33 @@ struct sk_buff *tipc_bearer_get_names(void)
}
}
}
read_unlock_bh(&tipc_net_lock);
return buf;
}
void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest)
void tipc_bearer_add_dest(u32 bearer_id, u32 dest)
{
tipc_nmap_add(&b_ptr->nodes, dest);
tipc_bcbearer_sort();
struct tipc_bearer *b_ptr;
rcu_read_lock();
b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]);
if (b_ptr) {
tipc_bcbearer_sort(&b_ptr->nodes, dest, true);
tipc_disc_add_dest(b_ptr->link_req);
}
rcu_read_unlock();
}
void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
void tipc_bearer_remove_dest(u32 bearer_id, u32 dest)
{
tipc_nmap_remove(&b_ptr->nodes, dest);
tipc_bcbearer_sort();
struct tipc_bearer *b_ptr;
rcu_read_lock();
b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]);
if (b_ptr) {
tipc_bcbearer_sort(&b_ptr->nodes, dest, false);
tipc_disc_remove_dest(b_ptr->link_req);
}
rcu_read_unlock();
}
/**
......@@ -271,13 +281,11 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
return -EINVAL;
}
write_lock_bh(&tipc_net_lock);
m_ptr = tipc_media_find(b_names.media_name);
if (!m_ptr) {
pr_warn("Bearer <%s> rejected, media <%s> not registered\n",
name, b_names.media_name);
goto exit;
return -EINVAL;
}
if (priority == TIPC_MEDIA_LINK_PRI)
......@@ -287,7 +295,7 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
bearer_id = MAX_BEARERS;
with_this_prio = 1;
for (i = MAX_BEARERS; i-- != 0; ) {
b_ptr = bearer_list[i];
b_ptr = rtnl_dereference(bearer_list[i]);
if (!b_ptr) {
bearer_id = i;
continue;
......@@ -295,14 +303,14 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
if (!strcmp(name, b_ptr->name)) {
pr_warn("Bearer <%s> rejected, already enabled\n",
name);
goto exit;
return -EINVAL;
}
if ((b_ptr->priority == priority) &&
(++with_this_prio > 2)) {
if (priority-- == 0) {
pr_warn("Bearer <%s> rejected, duplicate priority\n",
name);
goto exit;
return -EINVAL;
}
pr_warn("Bearer <%s> priority adjustment required %u->%u\n",
name, priority + 1, priority);
......@@ -312,21 +320,20 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
if (bearer_id >= MAX_BEARERS) {
pr_warn("Bearer <%s> rejected, bearer limit reached (%u)\n",
name, MAX_BEARERS);
goto exit;
return -EINVAL;
}
b_ptr = kzalloc(sizeof(*b_ptr), GFP_ATOMIC);
if (!b_ptr) {
res = -ENOMEM;
goto exit;
}
if (!b_ptr)
return -ENOMEM;
strcpy(b_ptr->name, name);
b_ptr->media = m_ptr;
res = m_ptr->enable_media(b_ptr);
if (res) {
pr_warn("Bearer <%s> rejected, enable failure (%d)\n",
name, -res);
goto exit;
return -EINVAL;
}
b_ptr->identity = bearer_id;
......@@ -341,16 +348,14 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
bearer_disable(b_ptr, false);
pr_warn("Bearer <%s> rejected, discovery object creation failed\n",
name);
goto exit;
return -EINVAL;
}
bearer_list[bearer_id] = b_ptr;
rcu_assign_pointer(bearer_list[bearer_id], b_ptr);
pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
name,
tipc_addr_string_fill(addr_string, disc_domain), priority);
exit:
write_unlock_bh(&tipc_net_lock);
return res;
}
......@@ -359,19 +364,16 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
*/
static int tipc_reset_bearer(struct tipc_bearer *b_ptr)
{
read_lock_bh(&tipc_net_lock);
pr_info("Resetting bearer <%s>\n", b_ptr->name);
tipc_disc_delete(b_ptr->link_req);
tipc_link_reset_list(b_ptr->identity);
tipc_disc_create(b_ptr, &b_ptr->bcast_addr);
read_unlock_bh(&tipc_net_lock);
tipc_disc_reset(b_ptr);
return 0;
}
/**
* bearer_disable
*
* Note: This routine assumes caller holds tipc_net_lock.
* Note: This routine assumes caller holds RTNL lock.
*/
static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down)
{
......@@ -385,12 +387,12 @@ static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down)
tipc_disc_delete(b_ptr->link_req);
for (i = 0; i < MAX_BEARERS; i++) {
if (b_ptr == bearer_list[i]) {
bearer_list[i] = NULL;
if (b_ptr == rtnl_dereference(bearer_list[i])) {
RCU_INIT_POINTER(bearer_list[i], NULL);
break;
}
}
kfree(b_ptr);
kfree_rcu(b_ptr, rcu);
}
int tipc_disable_bearer(const char *name)
......@@ -398,7 +400,6 @@ int tipc_disable_bearer(const char *name)
struct tipc_bearer *b_ptr;
int res;
write_lock_bh(&tipc_net_lock);
b_ptr = tipc_bearer_find(name);
if (b_ptr == NULL) {
pr_warn("Attempt to disable unknown bearer <%s>\n", name);
......@@ -407,7 +408,6 @@ int tipc_disable_bearer(const char *name)
bearer_disable(b_ptr, false);
res = 0;
}
write_unlock_bh(&tipc_net_lock);
return res;
}
......@@ -444,7 +444,7 @@ int tipc_enable_l2_media(struct tipc_bearer *b)
return -ENODEV;
/* Associate TIPC bearer with Ethernet bearer */
b->media_ptr = dev;
rcu_assign_pointer(b->media_ptr, dev);
memset(b->bcast_addr.value, 0, sizeof(b->bcast_addr.value));
memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len);
b->bcast_addr.media_id = b->media->type_id;
......@@ -463,8 +463,12 @@ int tipc_enable_l2_media(struct tipc_bearer *b)
*/
void tipc_disable_l2_media(struct tipc_bearer *b)
{
struct net_device *dev = (struct net_device *)b->media_ptr;
struct net_device *dev;
dev = (struct net_device *)rtnl_dereference(b->media_ptr);
RCU_INIT_POINTER(b->media_ptr, NULL);
RCU_INIT_POINTER(dev->tipc_ptr, NULL);
synchronize_net();
dev_put(dev);
}
......@@ -478,8 +482,12 @@ int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b,
struct tipc_media_addr *dest)
{
struct sk_buff *clone;
struct net_device *dev;
int delta;
struct net_device *dev = (struct net_device *)b->media_ptr;
dev = (struct net_device *)rcu_dereference_rtnl(b->media_ptr);
if (!dev)
return 0;
clone = skb_clone(buf, GFP_ATOMIC);
if (!clone)
......@@ -507,10 +515,16 @@ int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b,
* The media send routine must not alter the buffer being passed in
* as it may be needed for later retransmission!
*/
void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
void tipc_bearer_send(u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest)
{
b->media->send_msg(buf, b, dest);
struct tipc_bearer *b_ptr;
rcu_read_lock();
b_ptr = rcu_dereference_rtnl(bearer_list[bearer_id]);
if (likely(b_ptr))
b_ptr->media->send_msg(buf, b_ptr, dest);
rcu_read_unlock();
}
/**
......@@ -535,7 +549,7 @@ static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev,
}
rcu_read_lock();
b_ptr = rcu_dereference(dev->tipc_ptr);
b_ptr = rcu_dereference_rtnl(dev->tipc_ptr);
if (likely(b_ptr)) {
if (likely(buf->pkt_type <= PACKET_BROADCAST)) {
buf->next = NULL;
......@@ -568,12 +582,9 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
if (!net_eq(dev_net(dev), &init_net))
return NOTIFY_DONE;
rcu_read_lock();
b_ptr = rcu_dereference(dev->tipc_ptr);
if (!b_ptr) {
rcu_read_unlock();
b_ptr = rtnl_dereference(dev->tipc_ptr);
if (!b_ptr)
return NOTIFY_DONE;
}
b_ptr->mtu = dev->mtu;
......@@ -592,11 +603,9 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,
break;
case NETDEV_UNREGISTER:
case NETDEV_CHANGENAME:
tipc_disable_bearer(b_ptr->name);
bearer_disable(b_ptr, false);
break;
}
rcu_read_unlock();
return NOTIFY_OK;
}
......@@ -633,7 +642,7 @@ void tipc_bearer_stop(void)
u32 i;
for (i = 0; i < MAX_BEARERS; i++) {
b_ptr = bearer_list[i];
b_ptr = rtnl_dereference(bearer_list[i]);
if (b_ptr) {
bearer_disable(b_ptr, true);
bearer_list[i] = NULL;
......
......@@ -113,6 +113,7 @@ struct tipc_media {
* @name: bearer name (format = media:interface)
* @media: ptr to media structure associated with bearer
* @bcast_addr: media address used in broadcasting
* @rcu: rcu struct for tipc_bearer
* @priority: default link priority for bearer
* @window: default window size for bearer
* @tolerance: default link tolerance for bearer
......@@ -127,12 +128,13 @@ struct tipc_media {
* care of initializing all other fields.
*/
struct tipc_bearer {
void *media_ptr; /* initalized by media */
void __rcu *media_ptr; /* initalized by media */
u32 mtu; /* initalized by media */
struct tipc_media_addr addr; /* initalized by media */
char name[TIPC_MAX_BEARER_NAME];
struct tipc_media *media;
struct tipc_media_addr bcast_addr;
struct rcu_head rcu;
u32 priority;
u32 window;
u32 tolerance;
......@@ -150,7 +152,7 @@ struct tipc_bearer_names {
struct tipc_link;
extern struct tipc_bearer *bearer_list[];
extern struct tipc_bearer __rcu *bearer_list[];
/*
* TIPC routines available to supported media types
......@@ -181,14 +183,14 @@ int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b,
struct tipc_media_addr *dest);
struct sk_buff *tipc_bearer_get_names(void);
void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
void tipc_bearer_add_dest(u32 bearer_id, u32 dest);
void tipc_bearer_remove_dest(u32 bearer_id, u32 dest);
struct tipc_bearer *tipc_bearer_find(const char *name);
struct tipc_media *tipc_media_find(const char *name);
int tipc_bearer_setup(void);
void tipc_bearer_cleanup(void);
void tipc_bearer_stop(void);
void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
void tipc_bearer_send(u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest);
#endif /* _TIPC_BEARER_H */
......@@ -42,8 +42,6 @@
#define REPLY_TRUNCATED "<truncated>\n"
static DEFINE_MUTEX(config_mutex);
static const void *req_tlv_area; /* request message TLV area */
static int req_tlv_space; /* request message TLV area size */
static int rep_headroom; /* reply message headroom to use */
......@@ -223,7 +221,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
{
struct sk_buff *rep_tlv_buf;
mutex_lock(&config_mutex);
rtnl_lock();
/* Save request and reply details in a well-known location */
req_tlv_area = request_area;
......@@ -337,6 +335,6 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
/* Return reply buffer */
exit:
mutex_unlock(&config_mutex);
rtnl_unlock();
return rep_tlv_buf;
}
......@@ -56,7 +56,7 @@
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/vmalloc.h>
#include <linux/rtnetlink.h>
#define TIPC_MOD_VER "2.0.0"
......
......@@ -46,8 +46,9 @@
/**
* struct tipc_link_req - information about an ongoing link setup request
* @bearer: bearer issuing requests
* @bearer_id: identity of bearer issuing requests
* @dest: destination address for request messages
* @domain: network domain to which links can be established
* @num_nodes: number of nodes currently discovered (i.e. with an active link)
* @lock: spinlock for controlling access to requests
* @buf: request message to be (repeatedly) sent
......@@ -55,8 +56,9 @@
* @timer_intv: current interval between requests (in ms)
*/
struct tipc_link_req {
struct tipc_bearer *bearer;
u32 bearer_id;
struct tipc_media_addr dest;
u32 domain;
int num_nodes;
spinlock_t lock;
struct sk_buff *buf;
......@@ -69,13 +71,12 @@ struct tipc_link_req {
* @type: message type (request or response)
* @b_ptr: ptr to bearer issuing message
*/
static struct sk_buff *tipc_disc_init_msg(u32 type, struct tipc_bearer *b_ptr)
static void tipc_disc_init_msg(struct sk_buff *buf, u32 type,
struct tipc_bearer *b_ptr)
{
struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE);
struct tipc_msg *msg;
u32 dest_domain = b_ptr->domain;
if (buf) {
msg = buf_msg(buf);
tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain);
msg_set_non_seq(msg, 1);
......@@ -83,8 +84,6 @@ static struct sk_buff *tipc_disc_init_msg(u32 type, struct tipc_bearer *b_ptr)
msg_set_dest_domain(msg, dest_domain);
msg_set_bc_netid(msg, tipc_net_id);
b_ptr->media->addr2msg(&b_ptr->addr, msg_media_addr(msg));
}
return buf;
}
/**
......@@ -239,9 +238,10 @@ void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr)
link_fully_up = link_working_working(link);
if ((type == DSC_REQ_MSG) && !link_fully_up) {
rbuf = tipc_disc_init_msg(DSC_RESP_MSG, b_ptr);
rbuf = tipc_buf_acquire(INT_H_SIZE);
if (rbuf) {
tipc_bearer_send(b_ptr, rbuf, &media_addr);
tipc_disc_init_msg(rbuf, DSC_RESP_MSG, b_ptr);
tipc_bearer_send(b_ptr->identity, rbuf, &media_addr);
kfree_skb(rbuf);
}
}
......@@ -303,7 +303,7 @@ static void disc_timeout(struct tipc_link_req *req)
spin_lock_bh(&req->lock);
/* Stop searching if only desired node has been found */
if (tipc_node(req->bearer->domain) && req->num_nodes) {
if (tipc_node(req->domain) && req->num_nodes) {
req->timer_intv = TIPC_LINK_REQ_INACTIVE;
goto exit;
}
......@@ -315,7 +315,7 @@ static void disc_timeout(struct tipc_link_req *req)
* hold at fast polling rate if don't have any associated nodes,
* otherwise hold at slow polling rate
*/
tipc_bearer_send(req->bearer, req->buf, &req->dest);
tipc_bearer_send(req->bearer_id, req->buf, &req->dest);
req->timer_intv *= 2;
......@@ -347,21 +347,21 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest)
if (!req)
return -ENOMEM;
req->buf = tipc_disc_init_msg(DSC_REQ_MSG, b_ptr);
if (!req->buf) {
kfree(req);
return -ENOMSG;
}
req->buf = tipc_buf_acquire(INT_H_SIZE);
if (!req->buf)
return -ENOMEM;
tipc_disc_init_msg(req->buf, DSC_REQ_MSG, b_ptr);
memcpy(&req->dest, dest, sizeof(*dest));
req->bearer = b_ptr;
req->bearer_id = b_ptr->identity;
req->domain = b_ptr->domain;
req->num_nodes = 0;
req->timer_intv = TIPC_LINK_REQ_INIT;
spin_lock_init(&req->lock);
k_init_timer(&req->timer, (Handler)disc_timeout, (unsigned long)req);
k_start_timer(&req->timer, req->timer_intv);
b_ptr->link_req = req;
tipc_bearer_send(req->bearer, req->buf, &req->dest);
tipc_bearer_send(req->bearer_id, req->buf, &req->dest);
return 0;
}
......@@ -376,3 +376,23 @@ void tipc_disc_delete(struct tipc_link_req *req)
kfree_skb(req->buf);
kfree(req);
}
/**
* tipc_disc_reset - reset object to send periodic link setup requests
* @b_ptr: ptr to bearer issuing requests
* @dest_domain: network domain to which links can be established
*/
void tipc_disc_reset(struct tipc_bearer *b_ptr)
{
struct tipc_link_req *req = b_ptr->link_req;
spin_lock_bh(&req->lock);
tipc_disc_init_msg(req->buf, DSC_REQ_MSG, b_ptr);
req->bearer_id = b_ptr->identity;
req->domain = b_ptr->domain;
req->num_nodes = 0;
req->timer_intv = TIPC_LINK_REQ_INIT;
k_start_timer(&req->timer, req->timer_intv);
tipc_bearer_send(req->bearer_id, req->buf, &req->dest);
spin_unlock_bh(&req->lock);
}
......@@ -41,6 +41,7 @@ struct tipc_link_req;
int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest);
void tipc_disc_delete(struct tipc_link_req *req);
void tipc_disc_reset(struct tipc_bearer *b_ptr);
void tipc_disc_add_dest(struct tipc_link_req *req);
void tipc_disc_remove_dest(struct tipc_link_req *req);
void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr);
......
This diff is collapsed.
......@@ -107,7 +107,7 @@ struct tipc_stats {
* @checkpoint: reference point for triggering link continuity checking
* @peer_session: link session # being used by peer end of link
* @peer_bearer_id: bearer id used by link's peer endpoint
* @b_ptr: pointer to bearer used by link
* @bearer_id: local bearer id used by link
* @tolerance: minimum link continuity loss needed to reset link [in ms]
* @continuity_interval: link continuity testing interval [in ms]
* @abort_limit: # of unacknowledged continuity probes needed to reset link
......@@ -116,6 +116,7 @@ struct tipc_stats {
* @proto_msg: template for control messages generated by link
* @pmsg: convenience pointer to "proto_msg" field
* @priority: current link priority
* @net_plane: current link network plane ('A' through 'H')
* @queue_limit: outbound message queue congestion thresholds (indexed by user)
* @exp_msg_count: # of tunnelled messages expected during link changeover
* @reset_checkpoint: seq # of last acknowledged message at time of link reset
......@@ -155,7 +156,7 @@ struct tipc_link {
u32 checkpoint;
u32 peer_session;
u32 peer_bearer_id;
struct tipc_bearer *b_ptr;
u32 bearer_id;
u32 tolerance;
u32 continuity_interval;
u32 abort_limit;
......@@ -167,6 +168,7 @@ struct tipc_link {
} proto_msg;
struct tipc_msg *pmsg;
u32 priority;
char net_plane;
u32 queue_limit[15]; /* queue_limit[0]==window limit */
/* Changeover */
......
......@@ -248,7 +248,6 @@ void tipc_named_node_up(unsigned long nodearg)
u32 max_item_buf = 0;
/* compute maximum amount of publication data to send per message */
read_lock_bh(&tipc_net_lock);
n_ptr = tipc_node_find(node);
if (n_ptr) {
tipc_node_lock(n_ptr);
......@@ -258,7 +257,6 @@ void tipc_named_node_up(unsigned long nodearg)
ITEM_SIZE) * ITEM_SIZE;
tipc_node_unlock(n_ptr);
}
read_unlock_bh(&tipc_net_lock);
if (!max_item_buf)
return;
......
......@@ -45,34 +45,29 @@
/*
* The TIPC locking policy is designed to ensure a very fine locking
* granularity, permitting complete parallel access to individual
* port and node/link instances. The code consists of three major
* port and node/link instances. The code consists of four major
* locking domains, each protected with their own disjunct set of locks.
*
* 1: The routing hierarchy.
* Comprises the structures 'zone', 'cluster', 'node', 'link'
* and 'bearer'. The whole hierarchy is protected by a big
* read/write lock, tipc_net_lock, to enssure that nothing is added
* or removed while code is accessing any of these structures.
* This layer must not be called from the two others while they
* hold any of their own locks.
* Neither must it itself do any upcalls to the other two before
* it has released tipc_net_lock and other protective locks.
* 1: The bearer level.
* RTNL lock is used to serialize the process of configuring bearer
* on update side, and RCU lock is applied on read side to make
* bearer instance valid on both paths of message transmission and
* reception.
*
* Within the tipc_net_lock domain there are two sub-domains;'node' and
* 'bearer', where local write operations are permitted,
* provided that those are protected by individual spin_locks
* per instance. Code holding tipc_net_lock(read) and a node spin_lock
* is permitted to poke around in both the node itself and its
* subordinate links. I.e, it can update link counters and queues,
* change link state, send protocol messages, and alter the
* "active_links" array in the node; but it can _not_ remove a link
* or a node from the overall structure.
* Correspondingly, individual bearers may change status within a
* tipc_net_lock(read), protected by an individual spin_lock ber bearer
* instance, but it needs tipc_net_lock(write) to remove/add any bearers.
* 2: The node and link level.
* All node instances are saved into two tipc_node_list and node_htable
* lists. The two lists are protected by node_list_lock on write side,
* and they are guarded with RCU lock on read side. Especially node
* instance is destroyed only when TIPC module is removed, and we can
* confirm that there has no any user who is accessing the node at the
* moment. Therefore, Except for iterating the two lists within RCU
* protection, it's no needed to hold RCU that we access node instance
* in other places.
*
* In addition, all members in node structure including link instances
* are protected by node spin lock.
*
* 2: The transport level of the protocol.
* 3: The transport level of the protocol.
* This consists of the structures port, (and its user level
* representations, such as user_port and tipc_sock), reference and
* tipc_user (port.c, reg.c, socket.c).
......@@ -96,7 +91,7 @@
* There are two such lists; 'port_list', which is used for management,
* and 'wait_list', which is used to queue ports during congestion.
*
* 3: The name table (name_table.c, name_distr.c, subscription.c)
* 4: The name table (name_table.c, name_distr.c, subscription.c)
* - There is one big read/write-lock (tipc_nametbl_lock) protecting the
* overall name table structure. Nothing must be added/removed to
* this structure without holding write access to it.
......@@ -108,8 +103,6 @@
* - A local spin_lock protecting the queue of subscriber events.
*/
DEFINE_RWLOCK(tipc_net_lock);
static void net_route_named_msg(struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
......@@ -175,15 +168,13 @@ void tipc_net_start(u32 addr)
{
char addr_string[16];
write_lock_bh(&tipc_net_lock);
tipc_own_addr = addr;
tipc_named_reinit();
tipc_port_reinit();
tipc_bclink_init();
write_unlock_bh(&tipc_net_lock);
tipc_nametbl_publish(TIPC_CFG_SRV, tipc_own_addr, tipc_own_addr,
TIPC_ZONE_SCOPE, 0, tipc_own_addr);
pr_info("Started in network mode\n");
pr_info("Own node address %s, network identity %u\n",
tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id);
......@@ -195,11 +186,11 @@ void tipc_net_stop(void)
return;
tipc_nametbl_withdraw(TIPC_CFG_SRV, tipc_own_addr, 0, tipc_own_addr);
write_lock_bh(&tipc_net_lock);
rtnl_lock();
tipc_bearer_stop();
tipc_bclink_stop();
tipc_node_stop();
write_unlock_bh(&tipc_net_lock);
rtnl_unlock();
pr_info("Left network mode\n");
}
......@@ -37,8 +37,6 @@
#ifndef _TIPC_NET_H
#define _TIPC_NET_H
extern rwlock_t tipc_net_lock;
void tipc_net_route_msg(struct sk_buff *buf);
void tipc_net_start(u32 addr);
......
......@@ -148,7 +148,7 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
n_ptr->working_links++;
pr_info("Established link <%s> on network plane %c\n",
l_ptr->name, l_ptr->b_ptr->net_plane);
l_ptr->name, l_ptr->net_plane);
if (!active[0]) {
active[0] = active[1] = l_ptr;
......@@ -208,11 +208,11 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
if (!tipc_link_is_active(l_ptr)) {
pr_info("Lost standby link <%s> on network plane %c\n",
l_ptr->name, l_ptr->b_ptr->net_plane);
l_ptr->name, l_ptr->net_plane);
return;
}
pr_info("Lost link <%s> on network plane %c\n",
l_ptr->name, l_ptr->b_ptr->net_plane);
l_ptr->name, l_ptr->net_plane);
active = &n_ptr->active_links[0];
if (active[0] == l_ptr)
......@@ -239,7 +239,7 @@ int tipc_node_is_up(struct tipc_node *n_ptr)
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{
n_ptr->links[l_ptr->b_ptr->identity] = l_ptr;
n_ptr->links[l_ptr->bearer_id] = l_ptr;
spin_lock_bh(&node_list_lock);
tipc_num_links++;
spin_unlock_bh(&node_list_lock);
......@@ -273,14 +273,12 @@ static void node_name_purge_complete(unsigned long node_addr)
{
struct tipc_node *n_ptr;
read_lock_bh(&tipc_net_lock);
n_ptr = tipc_node_find(node_addr);
if (n_ptr) {
tipc_node_lock(n_ptr);
n_ptr->block_setup &= ~WAIT_NAMES_GONE;
tipc_node_unlock(n_ptr);
}
read_unlock_bh(&tipc_net_lock);
}
static void node_lost_contact(struct tipc_node *n_ptr)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment