Commit 1df33a11 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: separate link aggregation from link layer

We continue the work on separating the roles of the link aggregation and
link layers, as well as making code cleanups in general.

This second commit batch focuses on moving the orchestration of link
failover and synchronization to the node level, as well as preparing the
node lock structure for further future impovements. We also make some
changes to message delivery between link and socket layer, in order to
make this mechanism safer and less obscure.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 29a3060a 440d8963
......@@ -343,7 +343,7 @@ static int tipc_enable_bearer(struct net *net, const char *name,
static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b_ptr)
{
pr_info("Resetting bearer <%s>\n", b_ptr->name);
tipc_link_delete_list(net, b_ptr->identity);
tipc_node_delete_links(net, b_ptr->identity);
tipc_disc_reset(net, b_ptr);
return 0;
}
......@@ -361,7 +361,7 @@ static void bearer_disable(struct net *net, struct tipc_bearer *b_ptr)
pr_info("Disabling bearer <%s>\n", b_ptr->name);
b_ptr->media->disable_media(b_ptr);
tipc_link_delete_list(net, b_ptr->identity);
tipc_node_delete_links(net, b_ptr->identity);
if (b_ptr->link_req)
tipc_disc_delete(b_ptr->link_req);
......
......@@ -109,6 +109,11 @@ struct tipc_net {
atomic_t subscription_count;
};
static inline struct tipc_net *tipc_net(struct net *net)
{
return net_generic(net, tipc_net_id);
}
static inline u16 mod(u16 x)
{
return x & 0xffffu;
......
......@@ -120,29 +120,24 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,
* @buf: buffer containing message
* @bearer: bearer that message arrived on
*/
void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
void tipc_disc_rcv(struct net *net, struct sk_buff *skb,
struct tipc_bearer *bearer)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_node *node;
struct tipc_media_addr maddr;
struct sk_buff *rbuf;
struct tipc_msg *msg = buf_msg(buf);
u32 ddom = msg_dest_domain(msg);
u32 onode = msg_prevnode(msg);
u32 net_id = msg_bc_netid(msg);
u32 mtyp = msg_type(msg);
u32 signature = msg_node_sig(msg);
u16 caps = msg_node_capabilities(msg);
bool addr_match = false;
bool sign_match = false;
bool link_up = false;
bool accept_addr = false;
bool accept_sign = false;
struct sk_buff *rskb;
struct tipc_msg *hdr = buf_msg(skb);
u32 ddom = msg_dest_domain(hdr);
u32 onode = msg_prevnode(hdr);
u32 net_id = msg_bc_netid(hdr);
u32 mtyp = msg_type(hdr);
u32 signature = msg_node_sig(hdr);
u16 caps = msg_node_capabilities(hdr);
bool respond = false;
bool dupl_addr = false;
bearer->media->msg2addr(bearer, &maddr, msg_media_addr(msg));
kfree_skb(buf);
bearer->media->msg2addr(bearer, &maddr, msg_media_addr(hdr));
kfree_skb(skb);
/* Ensure message from node is valid and communication is permitted */
if (net_id != tn->net_id)
......@@ -164,91 +159,20 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
if (!tipc_in_scope(bearer->domain, onode))
return;
node = tipc_node_create(net, onode);
if (!node)
return;
tipc_node_lock(node);
node->capabilities = caps;
/* Prepare to validate requesting node's signature and media address */
sign_match = (signature == node->signature);
tipc_node_check_dest(node, bearer, &link_up, &addr_match, &maddr);
/* These three flags give us eight permutations: */
if (sign_match && addr_match && link_up) {
/* All is fine. Do nothing. */
} else if (sign_match && addr_match && !link_up) {
/* Respond. The link will come up in due time */
respond = true;
} else if (sign_match && !addr_match && link_up) {
/* Peer has changed i/f address without rebooting.
* If so, the link will reset soon, and the next
* discovery will be accepted. So we can ignore it.
* It may also be an cloned or malicious peer having
* chosen the same node address and signature as an
* existing one.
* Ignore requests until the link goes down, if ever.
*/
disc_dupl_alert(bearer, onode, &maddr);
} else if (sign_match && !addr_match && !link_up) {
/* Peer link has changed i/f address without rebooting.
* It may also be a cloned or malicious peer; we can't
* distinguish between the two.
* The signature is correct, so we must accept.
*/
accept_addr = true;
respond = true;
} else if (!sign_match && addr_match && link_up) {
/* Peer node rebooted. Two possibilities:
* - Delayed re-discovery; this link endpoint has already
* reset and re-established contact with the peer, before
* receiving a discovery message from that node.
* (The peer happened to receive one from this node first).
* - The peer came back so fast that our side has not
* discovered it yet. Probing from this side will soon
* reset the link, since there can be no working link
* endpoint at the peer end, and the link will re-establish.
* Accept the signature, since it comes from a known peer.
*/
accept_sign = true;
} else if (!sign_match && addr_match && !link_up) {
/* The peer node has rebooted.
* Accept signature, since it is a known peer.
*/
accept_sign = true;
respond = true;
} else if (!sign_match && !addr_match && link_up) {
/* Peer rebooted with new address, or a new/duplicate peer.
* Ignore until the link goes down, if ever.
*/
tipc_node_check_dest(net, onode, bearer, caps, signature,
&maddr, &respond, &dupl_addr);
if (dupl_addr)
disc_dupl_alert(bearer, onode, &maddr);
} else if (!sign_match && !addr_match && !link_up) {
/* Peer rebooted with new address, or it is a new peer.
* Accept signature and address.
*/
accept_sign = true;
accept_addr = true;
respond = true;
}
if (accept_sign)
node->signature = signature;
if (accept_addr && !tipc_node_update_dest(node, bearer, &maddr))
respond = false;
/* Send response, if necessary */
if (respond && (mtyp == DSC_REQ_MSG)) {
rbuf = tipc_buf_acquire(MAX_H_SIZE);
if (rbuf) {
tipc_disc_init_msg(net, rbuf, DSC_RESP_MSG, bearer);
tipc_bearer_send(net, bearer->identity, rbuf, &maddr);
kfree_skb(rbuf);
rskb = tipc_buf_acquire(MAX_H_SIZE);
if (rskb) {
tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer);
tipc_bearer_send(net, bearer->identity, rskb, &maddr);
kfree_skb(rskb);
}
}
tipc_node_unlock(node);
tipc_node_put(node);
}
/**
......
......@@ -48,9 +48,8 @@
/*
* Error message prefixes
*/
static const char *link_co_err = "Link changeover error, ";
static const char *link_co_err = "Link tunneling error, ";
static const char *link_rst_msg = "Resetting link ";
static const char *link_unk_evt = "Unknown link event ";
static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
[TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC },
......@@ -85,46 +84,23 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
*/
#define WILDCARD_SESSION 0x10000
/* State value stored in 'failover_pkts'
/* Link FSM states:
*/
#define FIRST_FAILOVER 0xffffu
/* Link FSM states and events:
*/
enum {
TIPC_LINK_WORKING,
TIPC_LINK_PROBING,
TIPC_LINK_RESETTING,
TIPC_LINK_ESTABLISHING
};
enum {
PEER_RESET_EVT = RESET_MSG,
ACTIVATE_EVT = ACTIVATE_MSG,
TRAFFIC_EVT, /* Any other valid msg from peer */
SILENCE_EVT /* Peer was silent during last timer interval*/
LINK_ESTABLISHED = 0xe,
LINK_ESTABLISHING = 0xe << 4,
LINK_RESET = 0x1 << 8,
LINK_RESETTING = 0x2 << 12,
LINK_PEER_RESET = 0xd << 16,
LINK_FAILINGOVER = 0xf << 20,
LINK_SYNCHING = 0xc << 24
};
/* Link FSM state checking routines
*/
static int link_working(struct tipc_link *l)
static int link_is_up(struct tipc_link *l)
{
return l->state == TIPC_LINK_WORKING;
}
static int link_probing(struct tipc_link *l)
{
return l->state == TIPC_LINK_PROBING;
}
static int link_resetting(struct tipc_link *l)
{
return l->state == TIPC_LINK_RESETTING;
}
static int link_establishing(struct tipc_link *l)
{
return l->state == TIPC_LINK_ESTABLISHING;
return l->state & (LINK_ESTABLISHED | LINK_SYNCHING);
}
static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
......@@ -134,38 +110,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
struct sk_buff_head *xmitq);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
struct sk_buff_head *xmitq);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
/*
* Simple link routines
* Simple non-static link routines (i.e. referenced outside this file)
*/
static unsigned int align(unsigned int i)
bool tipc_link_is_up(struct tipc_link *l)
{
return (i + 3) & ~3u;
return link_is_up(l);
}
static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
bool tipc_link_is_reset(struct tipc_link *l)
{
struct tipc_node *n = l->owner;
return l->state & (LINK_RESET | LINK_FAILINGOVER | LINK_ESTABLISHING);
}
if (node_active_link(n, 0) != l)
return node_active_link(n, 0);
return node_active_link(n, 1);
bool tipc_link_is_synching(struct tipc_link *l)
{
return l->state == LINK_SYNCHING;
}
/*
* Simple non-static link routines (i.e. referenced outside this file)
*/
int tipc_link_is_up(struct tipc_link *l_ptr)
bool tipc_link_is_failingover(struct tipc_link *l)
{
if (!l_ptr)
return 0;
return link_working(l_ptr) || link_probing(l_ptr);
return l->state == LINK_FAILINGOVER;
}
bool tipc_link_is_blocked(struct tipc_link *l)
{
return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);
}
int tipc_link_is_active(struct tipc_link *l)
......@@ -175,113 +147,71 @@ int tipc_link_is_active(struct tipc_link *l)
return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
}
/**
* tipc_link_create - create a new link
* @n_ptr: pointer to associated node
* @b_ptr: pointer to associated bearer
* @media_addr: media address to use when sending messages over link
*
* Returns pointer to link.
*/
struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
struct tipc_bearer *b_ptr,
const struct tipc_media_addr *media_addr,
struct sk_buff_head *inputq,
struct sk_buff_head *namedq)
static u32 link_own_addr(struct tipc_link *l)
{
struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
struct tipc_link *l_ptr;
struct tipc_msg *msg;
char *if_name;
char addr_string[16];
u32 peer = n_ptr->addr;
if (n_ptr->link_cnt >= MAX_BEARERS) {
tipc_addr_string_fill(addr_string, n_ptr->addr);
pr_err("Cannot establish %uth link to %s. Max %u allowed.\n",
n_ptr->link_cnt, addr_string, MAX_BEARERS);
return NULL;
}
if (n_ptr->links[b_ptr->identity].link) {
tipc_addr_string_fill(addr_string, n_ptr->addr);
pr_err("Attempt to establish second link on <%s> to %s\n",
b_ptr->name, addr_string);
return NULL;
}
l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
if (!l_ptr) {
pr_warn("Link creation failed, no memory\n");
return NULL;
}
l_ptr->addr = peer;
if_name = strchr(b_ptr->name, ':') + 1;
sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
tipc_zone(tn->own_addr), tipc_cluster(tn->own_addr),
tipc_node(tn->own_addr),
if_name,
tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
/* note: peer i/f name is updated by reset/activate message */
memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
l_ptr->owner = n_ptr;
l_ptr->peer_session = WILDCARD_SESSION;
l_ptr->bearer_id = b_ptr->identity;
l_ptr->tolerance = b_ptr->tolerance;
l_ptr->state = TIPC_LINK_RESETTING;
l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
msg = l_ptr->pmsg;
tipc_msg_init(tn->own_addr, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE,
l_ptr->addr);
msg_set_size(msg, sizeof(l_ptr->proto_msg));
msg_set_session(msg, (tn->random & 0xffff));
msg_set_bearer_id(msg, b_ptr->identity);
strcpy((char *)msg_data(msg), if_name);
l_ptr->net_plane = b_ptr->net_plane;
l_ptr->advertised_mtu = b_ptr->mtu;
l_ptr->mtu = l_ptr->advertised_mtu;
l_ptr->priority = b_ptr->priority;
tipc_link_set_queue_limits(l_ptr, b_ptr->window);
l_ptr->snd_nxt = 1;
__skb_queue_head_init(&l_ptr->transmq);
__skb_queue_head_init(&l_ptr->backlogq);
__skb_queue_head_init(&l_ptr->deferdq);
skb_queue_head_init(&l_ptr->wakeupq);
l_ptr->inputq = inputq;
l_ptr->namedq = namedq;
skb_queue_head_init(l_ptr->inputq);
link_reset_statistics(l_ptr);
tipc_node_attach_link(n_ptr, l_ptr);
return l_ptr;
return msg_prevnode(l->pmsg);
}
/**
* tipc_link_delete - Delete a link
* @l: link to be deleted
* tipc_link_create - create a new link
* @n: pointer to associated node
* @b: pointer to associated bearer
* @ownnode: identity of own node
* @peer: identity of peer node
* @maddr: media address to be used
* @inputq: queue to put messages ready for delivery
* @namedq: queue to put binding table update messages ready for delivery
* @link: return value, pointer to put the created link
*
* Returns true if link was created, otherwise false
*/
void tipc_link_delete(struct tipc_link *l)
bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session,
u32 ownnode, u32 peer, struct tipc_media_addr *maddr,
struct sk_buff_head *inputq, struct sk_buff_head *namedq,
struct tipc_link **link)
{
tipc_link_reset(l);
tipc_link_reset_fragments(l);
tipc_node_detach_link(l->owner, l);
}
void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_link *link;
struct tipc_node *node;
struct tipc_link *l;
struct tipc_msg *hdr;
char *if_name;
rcu_read_lock();
list_for_each_entry_rcu(node, &tn->node_list, list) {
tipc_node_lock(node);
link = node->links[bearer_id].link;
if (link)
tipc_link_delete(link);
tipc_node_unlock(node);
}
rcu_read_unlock();
l = kzalloc(sizeof(*l), GFP_ATOMIC);
if (!l)
return false;
*link = l;
/* Note: peer i/f name is completed by reset/activate message */
if_name = strchr(b->name, ':') + 1;
sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode),
if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
l->addr = peer;
l->media_addr = maddr;
l->owner = n;
l->peer_session = WILDCARD_SESSION;
l->bearer_id = b->identity;
l->tolerance = b->tolerance;
l->net_plane = b->net_plane;
l->advertised_mtu = b->mtu;
l->mtu = b->mtu;
l->priority = b->priority;
tipc_link_set_queue_limits(l, b->window);
l->inputq = inputq;
l->namedq = namedq;
l->state = LINK_RESETTING;
l->pmsg = (struct tipc_msg *)&l->proto_msg;
hdr = l->pmsg;
tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer);
msg_set_size(hdr, sizeof(l->proto_msg));
msg_set_session(hdr, session);
msg_set_bearer_id(hdr, l->bearer_id);
strcpy((char *)msg_data(hdr), if_name);
__skb_queue_head_init(&l->transmq);
__skb_queue_head_init(&l->backlogq);
__skb_queue_head_init(&l->deferdq);
skb_queue_head_init(&l->wakeupq);
skb_queue_head_init(l->inputq);
return true;
}
/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
......@@ -289,7 +219,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
* Give a newly added peer node the sequence number where it should
* start receiving and acking broadcast packets.
*/
static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
struct sk_buff_head *xmitq)
{
struct sk_buff *skb;
......@@ -311,130 +241,159 @@ static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
* tipc_link_fsm_evt - link finite state machine
* @l: pointer to link
* @evt: state machine event to be processed
* @xmitq: queue to prepend created protocol message, if any
*/
static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
struct sk_buff_head *xmitq)
int tipc_link_fsm_evt(struct tipc_link *l, int evt)
{
int mtyp = 0, rc = 0;
struct tipc_link *pl;
enum {
LINK_RESET = 1,
LINK_ACTIVATE = (1 << 1),
SND_PROBE = (1 << 2),
SND_STATE = (1 << 3),
SND_RESET = (1 << 4),
SND_ACTIVATE = (1 << 5),
SND_BCAST_SYNC = (1 << 6)
} actions = 0;
if (l->exec_mode == TIPC_LINK_BLOCKED)
return rc;
int rc = 0;
switch (l->state) {
case TIPC_LINK_WORKING:
case LINK_RESETTING:
switch (evt) {
case TRAFFIC_EVT:
case ACTIVATE_EVT:
case LINK_PEER_RESET_EVT:
l->state = LINK_PEER_RESET;
break;
case LINK_RESET_EVT:
l->state = LINK_RESET;
break;
case LINK_FAILURE_EVT:
case LINK_FAILOVER_BEGIN_EVT:
case LINK_ESTABLISH_EVT:
case LINK_FAILOVER_END_EVT:
case LINK_SYNCH_BEGIN_EVT:
case LINK_SYNCH_END_EVT:
default:
goto illegal_evt;
}
break;
case SILENCE_EVT:
l->state = TIPC_LINK_PROBING;
actions |= SND_PROBE;
case LINK_RESET:
switch (evt) {
case LINK_PEER_RESET_EVT:
l->state = LINK_ESTABLISHING;
break;
case PEER_RESET_EVT:
actions |= LINK_RESET | SND_ACTIVATE;
case LINK_FAILOVER_BEGIN_EVT:
l->state = LINK_FAILINGOVER;
case LINK_FAILURE_EVT:
case LINK_RESET_EVT:
case LINK_ESTABLISH_EVT:
case LINK_FAILOVER_END_EVT:
break;
case LINK_SYNCH_BEGIN_EVT:
case LINK_SYNCH_END_EVT:
default:
pr_debug("%s%u WORKING\n", link_unk_evt, evt);
goto illegal_evt;
}
break;
case TIPC_LINK_PROBING:
case LINK_PEER_RESET:
switch (evt) {
case TRAFFIC_EVT:
case ACTIVATE_EVT:
l->state = TIPC_LINK_WORKING;
case LINK_RESET_EVT:
l->state = LINK_ESTABLISHING;
break;
case LINK_PEER_RESET_EVT:
case LINK_ESTABLISH_EVT:
case LINK_FAILURE_EVT:
break;
case PEER_RESET_EVT:
actions |= LINK_RESET | SND_ACTIVATE;
case LINK_SYNCH_BEGIN_EVT:
case LINK_SYNCH_END_EVT:
case LINK_FAILOVER_BEGIN_EVT:
case LINK_FAILOVER_END_EVT:
default:
goto illegal_evt;
}
break;
case SILENCE_EVT:
if (l->silent_intv_cnt <= l->abort_limit) {
actions |= SND_PROBE;
case LINK_FAILINGOVER:
switch (evt) {
case LINK_FAILOVER_END_EVT:
l->state = LINK_RESET;
break;
case LINK_PEER_RESET_EVT:
case LINK_RESET_EVT:
case LINK_ESTABLISH_EVT:
case LINK_FAILURE_EVT:
break;
case LINK_FAILOVER_BEGIN_EVT:
case LINK_SYNCH_BEGIN_EVT:
case LINK_SYNCH_END_EVT:
default:
goto illegal_evt;
}
actions |= LINK_RESET | SND_RESET;
break;
case LINK_ESTABLISHING:
switch (evt) {
case LINK_ESTABLISH_EVT:
l->state = LINK_ESTABLISHED;
rc |= TIPC_LINK_UP_EVT;
break;
case LINK_FAILOVER_BEGIN_EVT:
l->state = LINK_FAILINGOVER;
break;
case LINK_PEER_RESET_EVT:
case LINK_RESET_EVT:
case LINK_FAILURE_EVT:
case LINK_SYNCH_BEGIN_EVT:
case LINK_FAILOVER_END_EVT:
break;
case LINK_SYNCH_END_EVT:
default:
pr_err("%s%u PROBING\n", link_unk_evt, evt);
goto illegal_evt;
}
break;
case TIPC_LINK_RESETTING:
case LINK_ESTABLISHED:
switch (evt) {
case TRAFFIC_EVT:
case LINK_PEER_RESET_EVT:
l->state = LINK_PEER_RESET;
rc |= TIPC_LINK_DOWN_EVT;
break;
case ACTIVATE_EVT:
pl = node_active_link(l->owner, 0);
if (pl && link_probing(pl))
case LINK_FAILURE_EVT:
l->state = LINK_RESETTING;
rc |= TIPC_LINK_DOWN_EVT;
break;
actions |= LINK_ACTIVATE;
if (!l->owner->working_links)
actions |= SND_BCAST_SYNC;
case LINK_RESET_EVT:
l->state = LINK_RESET;
break;
case PEER_RESET_EVT:
l->state = TIPC_LINK_ESTABLISHING;
actions |= SND_ACTIVATE;
case LINK_ESTABLISH_EVT:
break;
case SILENCE_EVT:
actions |= SND_RESET;
case LINK_SYNCH_BEGIN_EVT:
l->state = LINK_SYNCHING;
break;
case LINK_SYNCH_END_EVT:
case LINK_FAILOVER_BEGIN_EVT:
case LINK_FAILOVER_END_EVT:
default:
pr_err("%s%u in RESETTING\n", link_unk_evt, evt);
goto illegal_evt;
}
break;
case TIPC_LINK_ESTABLISHING:
case LINK_SYNCHING:
switch (evt) {
case TRAFFIC_EVT:
case ACTIVATE_EVT:
pl = node_active_link(l->owner, 0);
if (pl && link_probing(pl))
case LINK_PEER_RESET_EVT:
l->state = LINK_PEER_RESET;
rc |= TIPC_LINK_DOWN_EVT;
break;
actions |= LINK_ACTIVATE;
if (!l->owner->working_links)
actions |= SND_BCAST_SYNC;
case LINK_FAILURE_EVT:
l->state = LINK_RESETTING;
rc |= TIPC_LINK_DOWN_EVT;
break;
case LINK_RESET_EVT:
l->state = LINK_RESET;
break;
case PEER_RESET_EVT:
case LINK_ESTABLISH_EVT:
case LINK_SYNCH_BEGIN_EVT:
break;
case SILENCE_EVT:
actions |= SND_ACTIVATE;
case LINK_SYNCH_END_EVT:
l->state = LINK_ESTABLISHED;
break;
case LINK_FAILOVER_BEGIN_EVT:
case LINK_FAILOVER_END_EVT:
default:
pr_err("%s%u ESTABLISHING\n", link_unk_evt, evt);
goto illegal_evt;
}
break;
default:
pr_err("Unknown link state %u/%u\n", l->state, evt);
}
/* Perform actions as decided by FSM */
if (actions & LINK_RESET) {
l->exec_mode = TIPC_LINK_BLOCKED;
rc |= TIPC_LINK_DOWN_EVT;
}
if (actions & LINK_ACTIVATE) {
l->exec_mode = TIPC_LINK_OPEN;
rc |= TIPC_LINK_UP_EVT;
pr_err("Unknown FSM state %x in %s\n", l->state, l->name);
}
if (actions & (SND_STATE | SND_PROBE))
mtyp = STATE_MSG;
if (actions & SND_RESET)
mtyp = RESET_MSG;
if (actions & SND_ACTIVATE)
mtyp = ACTIVATE_MSG;
if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE))
tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE,
0, 0, 0, xmitq);
if (actions & SND_BCAST_SYNC)
tipc_link_build_bcast_sync_msg(l, xmitq);
return rc;
illegal_evt:
pr_err("Illegal FSM event %x in state %x on link %s\n",
evt, l->state, l->name);
return rc;
}
......@@ -484,13 +443,45 @@ static void link_profile_stats(struct tipc_link *l)
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
{
int rc = 0;
int mtyp = STATE_MSG;
bool xmit = false;
bool prb = false;
link_profile_stats(l);
if (l->silent_intv_cnt)
rc = tipc_link_fsm_evt(l, SILENCE_EVT, xmitq);
else if (link_working(l) && tipc_bclink_acks_missing(l->owner))
tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
switch (l->state) {
case LINK_ESTABLISHED:
case LINK_SYNCHING:
if (!l->silent_intv_cnt) {
if (tipc_bclink_acks_missing(l->owner))
xmit = true;
} else if (l->silent_intv_cnt <= l->abort_limit) {
xmit = true;
prb = true;
} else {
rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
l->silent_intv_cnt++;
break;
case LINK_RESET:
xmit = true;
mtyp = RESET_MSG;
break;
case LINK_ESTABLISHING:
xmit = true;
mtyp = ACTIVATE_MSG;
break;
case LINK_PEER_RESET:
case LINK_RESETTING:
case LINK_FAILINGOVER:
break;
default:
break;
}
if (xmit)
tipc_link_build_proto_msg(l, mtyp, prb, 0, 0, 0, xmitq);
return rc;
}
......@@ -550,8 +541,6 @@ void link_prepare_wakeup(struct tipc_link *l)
break;
skb_unlink(skb, &l->wakeupq);
skb_queue_tail(l->inputq, skb);
l->owner->inputq = l->inputq;
l->owner->action_flags |= TIPC_MSG_EVT;
}
}
......@@ -587,69 +576,36 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr)
tipc_link_reset_fragments(l_ptr);
}
void tipc_link_reset(struct tipc_link *l_ptr)
void tipc_link_reset(struct tipc_link *l)
{
u32 prev_state = l_ptr->state;
int was_active_link = tipc_link_is_active(l_ptr);
struct tipc_node *owner = l_ptr->owner;
struct tipc_link *pl = tipc_parallel_link(l_ptr);
msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
tipc_link_fsm_evt(l, LINK_RESET_EVT);
/* Link is down, accept any session */
l_ptr->peer_session = WILDCARD_SESSION;
/* Prepare for renewed mtu size negotiation */
l_ptr->mtu = l_ptr->advertised_mtu;
l->peer_session = WILDCARD_SESSION;
l_ptr->state = TIPC_LINK_RESETTING;
/* If peer is up, it only accepts an incremented session number */
msg_set_session(l->pmsg, msg_session(l->pmsg) + 1);
if ((prev_state == TIPC_LINK_RESETTING) ||
(prev_state == TIPC_LINK_ESTABLISHING))
return;
tipc_node_link_down(l_ptr->owner, l_ptr->bearer_id);
tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
l_ptr->exec_mode = TIPC_LINK_BLOCKED;
l_ptr->failover_checkpt = l_ptr->rcv_nxt;
pl->failover_pkts = FIRST_FAILOVER;
pl->failover_checkpt = l_ptr->rcv_nxt;
pl->failover_skb = l_ptr->reasm_buf;
} else {
kfree_skb(l_ptr->reasm_buf);
}
/* Clean up all queues, except inputq: */
__skb_queue_purge(&l_ptr->transmq);
__skb_queue_purge(&l_ptr->deferdq);
if (!owner->inputq)
owner->inputq = l_ptr->inputq;
skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
if (!skb_queue_empty(owner->inputq))
owner->action_flags |= TIPC_MSG_EVT;
tipc_link_purge_backlog(l_ptr);
l_ptr->reasm_buf = NULL;
l_ptr->rcv_unacked = 0;
l_ptr->snd_nxt = 1;
l_ptr->rcv_nxt = 1;
l_ptr->silent_intv_cnt = 0;
l_ptr->stats.recv_info = 0;
l_ptr->stale_count = 0;
link_reset_statistics(l_ptr);
}
void tipc_link_activate(struct tipc_link *link)
{
struct tipc_node *node = link->owner;
link->rcv_nxt = 1;
link->stats.recv_info = 1;
link->silent_intv_cnt = 0;
link->state = TIPC_LINK_WORKING;
link->exec_mode = TIPC_LINK_OPEN;
tipc_node_link_up(node, link->bearer_id);
tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
/* Prepare for renewed mtu size negotiation */
l->mtu = l->advertised_mtu;
/* Clean up all queues: */
__skb_queue_purge(&l->transmq);
__skb_queue_purge(&l->deferdq);
skb_queue_splice_init(&l->wakeupq, l->inputq);
tipc_link_purge_backlog(l);
kfree_skb(l->reasm_buf);
kfree_skb(l->failover_reasm_skb);
l->reasm_buf = NULL;
l->failover_reasm_skb = NULL;
l->rcv_unacked = 0;
l->snd_nxt = 1;
l->rcv_nxt = 1;
l->silent_intv_cnt = 0;
l->stats.recv_info = 0;
l->stale_count = 0;
link_reset_statistics(l);
}
/**
......@@ -671,7 +627,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
u16 ack = mod(link->rcv_nxt - 1);
u16 seqno = link->snd_nxt;
u16 bc_last_in = link->owner->bclink.last_in;
struct tipc_media_addr *addr = &link->media_addr;
struct tipc_media_addr *addr = link->media_addr;
struct sk_buff_head *transmq = &link->transmq;
struct sk_buff_head *backlogq = &link->backlogq;
struct sk_buff *skb, *bskb;
......@@ -792,20 +748,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
return 0;
}
static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
skb_queue_head_init(list);
__skb_queue_tail(list, skb);
}
static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
{
struct sk_buff_head head;
skb2list(skb, &head);
return __tipc_link_xmit(link->owner->net, link, &head);
}
/*
* tipc_link_sync_rcv - synchronize broadcast link endpoints.
* Receive the sequence number where we should start receiving and
......@@ -851,7 +793,7 @@ void tipc_link_push_packets(struct tipc_link *link)
link->rcv_unacked = 0;
__skb_queue_tail(&link->transmq, skb);
tipc_bearer_send(link->owner->net, link->bearer_id,
skb, &link->media_addr);
skb, link->media_addr);
}
link->snd_nxt = seqno;
}
......@@ -884,26 +826,6 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
l->snd_nxt = seqno;
}
void tipc_link_reset_all(struct tipc_node *node)
{
char addr_string[16];
u32 i;
tipc_node_lock(node);
pr_warn("Resetting all links to %s\n",
tipc_addr_string_fill(addr_string, node->addr));
for (i = 0; i < MAX_BEARERS; i++) {
if (node->links[i].link) {
link_print(node->links[i].link, "Resetting link\n");
tipc_link_reset(node->links[i].link);
}
}
tipc_node_unlock(node);
}
static void link_retransmit_failure(struct tipc_link *l_ptr,
struct sk_buff *buf)
{
......@@ -920,7 +842,6 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
msg_errcode(msg));
pr_info("sqno %u, prev: %x, src: %x\n",
msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
tipc_link_reset(l_ptr);
} else {
/* Handle failure on broadcast link */
struct tipc_node *n_ptr;
......@@ -975,7 +896,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb,
&l_ptr->media_addr);
l_ptr->media_addr);
retransmits--;
l_ptr->stats.retransmitted++;
}
......@@ -996,7 +917,7 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
l->stale_count = 1;
} else if (++l->stale_count > 100) {
link_retransmit_failure(l, skb);
return TIPC_LINK_DOWN_EVT;
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
skb_queue_walk(&l->transmq, skb) {
if (!retransm)
......@@ -1016,60 +937,27 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
return 0;
}
/* link_synch(): check if all packets arrived before the synch
* point have been consumed
* Returns true if the parallel links are synched, otherwise false
*/
static bool link_synch(struct tipc_link *l)
{
unsigned int post_synch;
struct tipc_link *pl;
pl = tipc_parallel_link(l);
if (pl == l)
goto synched;
/* Was last pre-synch packet added to input queue ? */
if (less_eq(pl->rcv_nxt, l->synch_point))
return false;
/* Is it still in the input queue ? */
post_synch = mod(pl->rcv_nxt - l->synch_point) - 1;
if (skb_queue_len(pl->inputq) > post_synch)
return false;
synched:
l->exec_mode = TIPC_LINK_OPEN;
return true;
}
/* tipc_data_input - deliver data and name distr msgs to upper layer
*
* Consumes buffer if message is of right type
* Node lock must be held
*/
static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
struct tipc_node *node = link->owner;
struct tipc_msg *msg = buf_msg(skb);
u32 dport = msg_destport(msg);
switch (msg_user(msg)) {
switch (msg_user(buf_msg(skb))) {
case TIPC_LOW_IMPORTANCE:
case TIPC_MEDIUM_IMPORTANCE:
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
case CONN_MANAGER:
if (tipc_skb_queue_tail(link->inputq, skb, dport)) {
node->inputq = link->inputq;
node->action_flags |= TIPC_MSG_EVT;
}
__skb_queue_tail(inputq, skb);
return true;
case NAME_DISTRIBUTOR:
node->bclink.recv_permitted = true;
node->namedq = link->namedq;
skb_queue_tail(link->namedq, skb);
if (skb_queue_len(link->namedq) == 1)
node->action_flags |= TIPC_NAMED_MSG_EVT;
return true;
case MSG_BUNDLER:
case TUNNEL_PROTOCOL:
......@@ -1086,51 +974,59 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
/* tipc_link_input - process packet that has passed link protocol check
*
* Consumes buffer
* Node lock must be held
*/
static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
struct tipc_node *node = link->owner;
struct tipc_msg *msg = buf_msg(skb);
struct tipc_node *node = l->owner;
struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff **reasm_skb = &l->reasm_buf;
struct sk_buff *iskb;
int usr = msg_user(hdr);
int rc = 0;
int pos = 0;
int ipos = 0;
switch (msg_user(msg)) {
case TUNNEL_PROTOCOL:
if (msg_dup(msg)) {
link->exec_mode = TIPC_LINK_TUNNEL;
link->synch_point = msg_seqno(msg_get_wrapped(msg));
kfree_skb(skb);
break;
if (unlikely(usr == TUNNEL_PROTOCOL)) {
if (msg_type(hdr) == SYNCH_MSG) {
__skb_queue_purge(&l->deferdq);
goto drop;
}
if (!tipc_link_failover_rcv(link, &skb))
break;
if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
tipc_data_input(link, skb);
break;
if (!tipc_msg_extract(skb, &iskb, &ipos))
return rc;
kfree_skb(skb);
skb = iskb;
hdr = buf_msg(skb);
if (less(msg_seqno(hdr), l->drop_point))
goto drop;
if (tipc_data_input(l, skb, inputq))
return rc;
usr = msg_user(hdr);
reasm_skb = &l->failover_reasm_skb;
}
case MSG_BUNDLER:
link->stats.recv_bundles++;
link->stats.recv_bundled += msg_msgcnt(msg);
if (usr == MSG_BUNDLER) {
l->stats.recv_bundles++;
l->stats.recv_bundled += msg_msgcnt(hdr);
while (tipc_msg_extract(skb, &iskb, &pos))
tipc_data_input(link, iskb);
break;
case MSG_FRAGMENTER:
link->stats.recv_fragments++;
if (tipc_buf_append(&link->reasm_buf, &skb)) {
link->stats.recv_fragmented++;
tipc_data_input(link, skb);
} else if (!link->reasm_buf) {
tipc_link_reset(link);
tipc_data_input(l, iskb, inputq);
return 0;
} else if (usr == MSG_FRAGMENTER) {
l->stats.recv_fragments++;
if (tipc_buf_append(reasm_skb, &skb)) {
l->stats.recv_fragmented++;
tipc_data_input(l, skb, inputq);
} else if (!*reasm_skb) {
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
break;
case BCAST_PROTOCOL:
return 0;
} else if (usr == BCAST_PROTOCOL) {
tipc_link_sync_rcv(node, skb);
break;
default:
break;
};
return 0;
}
drop:
kfree_skb(skb);
return 0;
}
static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
......@@ -1157,11 +1053,13 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{
struct sk_buff_head *arrvq = &l->deferdq;
struct sk_buff *tmp;
struct sk_buff_head tmpq;
struct tipc_msg *hdr;
u16 seqno, rcv_nxt;
int rc = 0;
__skb_queue_head_init(&tmpq);
if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) {
if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV))
tipc_link_build_proto_msg(l, STATE_MSG, 0,
......@@ -1169,21 +1067,21 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
return rc;
}
skb_queue_walk_safe(arrvq, skb, tmp) {
while ((skb = skb_peek(arrvq))) {
hdr = buf_msg(skb);
/* Verify and update link state */
if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) {
__skb_dequeue(arrvq);
rc |= tipc_link_proto_rcv(l, skb, xmitq);
rc = tipc_link_proto_rcv(l, skb, xmitq);
continue;
}
if (unlikely(!link_working(l))) {
rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
if (!link_working(l)) {
if (unlikely(!link_is_up(l))) {
rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
if (!link_is_up(l)) {
kfree_skb(__skb_dequeue(arrvq));
return rc;
goto exit;
}
}
......@@ -1201,7 +1099,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
rcv_nxt = l->rcv_nxt;
if (unlikely(less(rcv_nxt, seqno))) {
l->stats.deferred_recv++;
return rc;
goto exit;
}
__skb_dequeue(arrvq);
......@@ -1210,21 +1108,14 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
if (unlikely(more(rcv_nxt, seqno))) {
l->stats.duplicates++;
kfree_skb(skb);
return rc;
}
/* Synchronize with parallel link if applicable */
if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL))
if (!msg_dup(hdr) && !link_synch(l)) {
kfree_skb(skb);
return rc;
goto exit;
}
/* Packet can be delivered */
l->rcv_nxt++;
l->stats.recv_info++;
if (unlikely(!tipc_data_input(l, skb)))
tipc_link_input(l, skb);
if (unlikely(!tipc_data_input(l, skb, &tmpq)))
rc = tipc_link_input(l, skb, &tmpq);
/* Ack at regular intervals */
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
......@@ -1234,6 +1125,8 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
0, 0, 0, 0, xmitq);
}
}
exit:
tipc_skb_queue_splice_tail(&tmpq, l->inputq);
return rc;
}
......@@ -1291,7 +1184,7 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
skb = __skb_dequeue(&xmitq);
if (!skb)
return;
tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr);
l->rcv_unacked = 0;
kfree_skb(skb);
}
......@@ -1310,7 +1203,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
int node_up = l->owner->bclink.recv_permitted;
/* Don't send protocol message during reset or link failover */
if (l->exec_mode == TIPC_LINK_BLOCKED)
if (tipc_link_is_blocked(l))
return;
msg_set_type(hdr, mtyp);
......@@ -1345,7 +1238,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
} else {
/* RESET_MSG or ACTIVATE_MSG */
msg_set_max_pkt(hdr, l->advertised_mtu);
msg_set_ack(hdr, l->failover_checkpt - 1);
msg_set_ack(hdr, l->rcv_nxt - 1);
msg_set_next_sent(hdr, 1);
}
skb = tipc_buf_acquire(msg_size(hdr));
......@@ -1353,220 +1246,74 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
return;
skb_copy_to_linear_data(skb, hdr, msg_size(hdr));
skb->priority = TC_PRIO_CONTROL;
__skb_queue_head(xmitq, skb);
__skb_queue_tail(xmitq, skb);
}
/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
* a different bearer. Owner node is locked.
/* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets
* with contents of the link's tranmsit and backlog queues.
*/
static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
struct tipc_msg *tunnel_hdr,
struct tipc_msg *msg,
u32 selector)
void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
int mtyp, struct sk_buff_head *xmitq)
{
struct tipc_link *tunnel;
struct sk_buff *skb;
u32 length = msg_size(msg);
struct sk_buff *skb, *tnlskb;
struct tipc_msg *hdr, tnlhdr;
struct sk_buff_head *queue = &l->transmq;
struct sk_buff_head tmpxq, tnlq;
u16 pktlen, pktcnt, seqno = l->snd_nxt;
tunnel = node_active_link(l_ptr->owner, selector & 1);
if (!tipc_link_is_up(tunnel)) {
pr_warn("%stunnel link no longer available\n", link_co_err);
return;
}
msg_set_size(tunnel_hdr, length + INT_H_SIZE);
skb = tipc_buf_acquire(length + INT_H_SIZE);
if (!skb) {
pr_warn("%sunable to send tunnel msg\n", link_co_err);
if (!tnl)
return;
}
skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
__tipc_link_xmit_skb(tunnel, skb);
}
skb_queue_head_init(&tnlq);
skb_queue_head_init(&tmpxq);
/* tipc_link_failover_send_queue(): A link has gone down, but a second
* link is still active. We can do failover. Tunnel the failing link's
* whole send queue via the remaining link. This way, we don't lose
* any packets, and sequence order is preserved for subsequent traffic
* sent over the remaining link. Owner node is locked.
*/
void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
{
int msgcount;
struct tipc_link *tunnel = node_active_link(l_ptr->owner, 0);
struct tipc_msg tunnel_hdr;
struct sk_buff *skb;
int split_bundles;
if (!tunnel)
return;
tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL,
FAILOVER_MSG, INT_H_SIZE, l_ptr->addr);
skb_queue_walk(&l_ptr->backlogq, skb) {
msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt);
l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1);
}
skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
tipc_link_purge_backlog(l_ptr);
msgcount = skb_queue_len(&l_ptr->transmq);
msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
msg_set_msgcnt(&tunnel_hdr, msgcount);
if (skb_queue_empty(&l_ptr->transmq)) {
skb = tipc_buf_acquire(INT_H_SIZE);
if (skb) {
skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
msg_set_size(&tunnel_hdr, INT_H_SIZE);
__tipc_link_xmit_skb(tunnel, skb);
} else {
pr_warn("%sunable to send changeover msg\n",
link_co_err);
}
/* At least one packet required for safe algorithm => add dummy */
skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
BASIC_H_SIZE, 0, l->addr, link_own_addr(l),
0, 0, TIPC_ERR_NO_PORT);
if (!skb) {
pr_warn("%sunable to create tunnel packet\n", link_co_err);
return;
}
split_bundles = (node_active_link(l_ptr->owner, 0) !=
node_active_link(l_ptr->owner, 0));
skb_queue_walk(&l_ptr->transmq, skb) {
struct tipc_msg *msg = buf_msg(skb);
if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
struct tipc_msg *m = msg_get_wrapped(msg);
unchar *pos = (unchar *)m;
msgcount = msg_msgcnt(msg);
while (msgcount--) {
msg_set_seqno(m, msg_seqno(msg));
tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
msg_link_selector(m));
pos += align(msg_size(m));
m = (struct tipc_msg *)pos;
}
} else {
tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
msg_link_selector(msg));
}
}
}
/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
* duplicate of the first link's send queue via the new link. This way, we
* are guaranteed that currently queued packets from a socket are delivered
* before future traffic from the same socket, even if this is using the
* new link. The last arriving copy of each duplicate packet is dropped at
* the receiving end by the regular protocol check, so packet cardinality
* and sequence order is preserved per sender/receiver socket pair.
* Owner node is locked.
*/
void tipc_link_dup_queue_xmit(struct tipc_link *link,
struct tipc_link *tnl)
{
struct sk_buff *skb;
struct tipc_msg tnl_hdr;
struct sk_buff_head *queue = &link->transmq;
int mcnt;
u16 seqno;
tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL,
SYNCH_MSG, INT_H_SIZE, link->addr);
mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
msg_set_msgcnt(&tnl_hdr, mcnt);
msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
tunnel_queue:
skb_queue_tail(&tnlq, skb);
tipc_link_xmit(l, &tnlq, &tmpxq);
__skb_queue_purge(&tmpxq);
/* Initialize reusable tunnel packet header */
tipc_msg_init(link_own_addr(l), &tnlhdr, TUNNEL_PROTOCOL,
mtyp, INT_H_SIZE, l->addr);
pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq);
msg_set_msgcnt(&tnlhdr, pktcnt);
msg_set_bearer_id(&tnlhdr, l->peer_bearer_id);
tnl:
/* Wrap each packet into a tunnel packet */
skb_queue_walk(queue, skb) {
struct sk_buff *outskb;
struct tipc_msg *msg = buf_msg(skb);
u32 len = msg_size(msg);
msg_set_ack(msg, mod(link->rcv_nxt - 1));
msg_set_bcast_ack(msg, link->owner->bclink.last_in);
msg_set_size(&tnl_hdr, len + INT_H_SIZE);
outskb = tipc_buf_acquire(len + INT_H_SIZE);
if (outskb == NULL) {
pr_warn("%sunable to send duplicate msg\n",
link_co_err);
hdr = buf_msg(skb);
if (queue == &l->backlogq)
msg_set_seqno(hdr, seqno++);
pktlen = msg_size(hdr);
msg_set_size(&tnlhdr, pktlen + INT_H_SIZE);
tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE);
if (!tnlskb) {
pr_warn("%sunable to send packet\n", link_co_err);
return;
}
skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
skb->data, len);
__tipc_link_xmit_skb(tnl, outskb);
if (!tipc_link_is_up(link))
return;
skb_copy_to_linear_data(tnlskb, &tnlhdr, INT_H_SIZE);
skb_copy_to_linear_data_offset(tnlskb, INT_H_SIZE, hdr, pktlen);
__skb_queue_tail(&tnlq, tnlskb);
}
if (queue == &link->backlogq)
return;
seqno = link->snd_nxt;
skb_queue_walk(&link->backlogq, skb) {
msg_set_seqno(buf_msg(skb), seqno);
seqno = mod(seqno + 1);
if (queue != &l->backlogq) {
queue = &l->backlogq;
goto tnl;
}
queue = &link->backlogq;
goto tunnel_queue;
}
/* tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet
* Owner node is locked.
*/
static bool tipc_link_failover_rcv(struct tipc_link *link,
struct sk_buff **skb)
{
struct tipc_msg *msg = buf_msg(*skb);
struct sk_buff *iskb = NULL;
struct tipc_link *pl = NULL;
int bearer_id = msg_bearer_id(msg);
int pos = 0;
tipc_link_xmit(tnl, &tnlq, xmitq);
if (msg_type(msg) != FAILOVER_MSG) {
pr_warn("%sunknown tunnel pkt received\n", link_co_err);
goto exit;
if (mtyp == FAILOVER_MSG) {
tnl->drop_point = l->rcv_nxt;
tnl->failover_reasm_skb = l->reasm_buf;
l->reasm_buf = NULL;
}
if (bearer_id >= MAX_BEARERS)
goto exit;
if (bearer_id == link->bearer_id)
goto exit;
pl = link->owner->links[bearer_id].link;
if (pl && tipc_link_is_up(pl))
tipc_link_reset(pl);
if (link->failover_pkts == FIRST_FAILOVER)
link->failover_pkts = msg_msgcnt(msg);
/* Should we expect an inner packet? */
if (!link->failover_pkts)
goto exit;
if (!tipc_msg_extract(*skb, &iskb, &pos)) {
pr_warn("%sno inner failover pkt\n", link_co_err);
*skb = NULL;
goto exit;
}
link->failover_pkts--;
*skb = NULL;
/* Was this packet already delivered? */
if (less(buf_seqno(iskb), link->failover_checkpt)) {
kfree_skb(iskb);
iskb = NULL;
goto exit;
}
if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) {
link->stats.recv_fragments++;
tipc_buf_append(&link->failover_skb, &iskb);
}
exit:
if (!link->failover_pkts && pl)
pl->exec_mode = TIPC_LINK_OPEN;
kfree_skb(*skb);
*skb = iskb;
return *skb;
}
/* tipc_link_proto_rcv(): receive link level protocol message :
......@@ -1586,7 +1333,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
char *if_name;
int rc = 0;
if (l->exec_mode == TIPC_LINK_BLOCKED)
if (tipc_link_is_blocked(l))
goto exit;
if (link_own_addr(l) > msg_prevnode(hdr))
......@@ -1600,6 +1347,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
(l->peer_session != WILDCARD_SESSION))
break;
/* fall thru' */
case ACTIVATE_MSG:
/* Complete own link name with peer's interface name */
......@@ -1618,13 +1366,20 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI))
l->priority = peers_prio;
if (msg_type(hdr) == RESET_MSG) {
rc |= tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT);
} else if (!link_is_up(l)) {
tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT);
rc |= tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
}
l->peer_session = msg_session(hdr);
l->peer_bearer_id = msg_bearer_id(hdr);
rc = tipc_link_fsm_evt(l, msg_type(hdr), xmitq);
if (l->mtu > msg_max_pkt(hdr))
l->mtu = msg_max_pkt(hdr);
break;
case STATE_MSG:
/* Update own tolerance if peer indicates a non-zero value */
if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
l->tolerance = peers_tol;
......@@ -1633,11 +1388,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
l->stats.recv_states++;
if (msg_probe(hdr))
l->stats.recv_probes++;
rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
if (!tipc_link_is_up(l))
rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
if (!link_is_up(l))
break;
/* Has peer sent packets we haven't received yet ? */
/* Send NACK if peer has sent pkts we haven't received yet */
if (more(peers_snd_nxt, l->rcv_nxt))
rcvgap = peers_snd_nxt - l->rcv_nxt;
if (rcvgap || (msg_probe(hdr)))
......@@ -1647,9 +1402,10 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
/* If NACK, retransmit will now start at right position */
if (nacked_gap) {
rc |= tipc_link_retransm(l, nacked_gap, xmitq);
rc = tipc_link_retransm(l, nacked_gap, xmitq);
l->stats.recv_nacks++;
}
tipc_link_advance_backlog(l, xmitq);
if (unlikely(!skb_queue_empty(&l->wakeupq)))
link_prepare_wakeup(l);
......@@ -1726,19 +1482,7 @@ static void link_print(struct tipc_link *l, const char *str)
u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt;
u16 tail = l->snd_nxt - 1;
pr_info("%s Link <%s>:", str, l->name);
if (link_probing(l))
pr_cont(":P\n");
else if (link_establishing(l))
pr_cont(":E\n");
else if (link_resetting(l))
pr_cont(":R\n");
else if (link_working(l))
pr_cont(":W\n");
else
pr_cont("\n");
pr_info("%s Link <%s> state %x\n", str, l->name, l->state);
pr_info("XMTQ: %u [%u-%u], BKLGQ: %u, SNDNX: %u, RCVNX: %u\n",
skb_queue_len(&l->transmq), head, tail,
skb_queue_len(&l->backlogq), l->snd_nxt, l->rcv_nxt);
......
......@@ -49,13 +49,17 @@
*/
#define INVALID_LINK_SEQ 0x10000
/* Link endpoint receive states
/* Link FSM events:
*/
enum {
TIPC_LINK_OPEN,
TIPC_LINK_BLOCKED,
TIPC_LINK_TUNNEL
LINK_ESTABLISH_EVT = 0xec1ab1e,
LINK_PEER_RESET_EVT = 0x9eed0e,
LINK_FAILURE_EVT = 0xfa110e,
LINK_RESET_EVT = 0x10ca1d0e,
LINK_FAILOVER_BEGIN_EVT = 0xfa110bee,
LINK_FAILOVER_END_EVT = 0xfa110ede,
LINK_SYNCH_BEGIN_EVT = 0xc1ccbee,
LINK_SYNCH_END_EVT = 0xc1ccede
};
/* Events returned from link at packet reception or at timeout
......@@ -120,7 +124,6 @@ struct tipc_stats {
* @pmsg: convenience pointer to "proto_msg" field
* @priority: current link priority
* @net_plane: current link network plane ('A' through 'H')
* @exec_mode: transmit/receive mode for link endpoint instance
* @backlog_limit: backlog queue congestion thresholds (indexed by importance)
* @exp_msg_count: # of tunnelled messages expected during link changeover
* @reset_rcv_checkpt: seq # of last acknowledged message at time of link reset
......@@ -145,7 +148,7 @@ struct tipc_stats {
struct tipc_link {
u32 addr;
char name[TIPC_MAX_LINK_NAME];
struct tipc_media_addr media_addr;
struct tipc_media_addr *media_addr;
struct tipc_node *owner;
/* Management and link supervision data */
......@@ -155,7 +158,7 @@ struct tipc_link {
u32 tolerance;
unsigned long keepalive_intv;
u32 abort_limit;
int state;
u32 state;
u32 silent_intv_cnt;
struct {
unchar hdr[INT_H_SIZE];
......@@ -164,13 +167,10 @@ struct tipc_link {
struct tipc_msg *pmsg;
u32 priority;
char net_plane;
u8 exec_mode;
u16 synch_point;
/* Failover */
u16 failover_pkts;
u16 failover_checkpt;
struct sk_buff *failover_skb;
/* Failover/synch */
u16 drop_point;
struct sk_buff *failover_reasm_skb;
/* Max packet negotiation */
u16 mtu;
......@@ -205,25 +205,25 @@ struct tipc_link {
struct tipc_stats stats;
};
struct tipc_port;
struct tipc_link *tipc_link_create(struct tipc_node *n,
struct tipc_bearer *b,
const struct tipc_media_addr *maddr,
struct sk_buff_head *inputq,
struct sk_buff_head *namedq);
void tipc_link_delete(struct tipc_link *link);
void tipc_link_delete_list(struct net *net, unsigned int bearer_id);
void tipc_link_failover_send_queue(struct tipc_link *l_ptr);
void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest);
bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session,
u32 ownnode, u32 peer, struct tipc_media_addr *maddr,
struct sk_buff_head *inputq, struct sk_buff_head *namedq,
struct tipc_link **link);
void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
int mtyp, struct sk_buff_head *xmitq);
void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
struct sk_buff_head *xmitq);
int tipc_link_fsm_evt(struct tipc_link *l, int evt);
void tipc_link_reset_fragments(struct tipc_link *l_ptr);
int tipc_link_is_up(struct tipc_link *l_ptr);
bool tipc_link_is_up(struct tipc_link *l);
bool tipc_link_is_reset(struct tipc_link *l);
bool tipc_link_is_synching(struct tipc_link *l);
bool tipc_link_is_failingover(struct tipc_link *l);
bool tipc_link_is_blocked(struct tipc_link *l);
int tipc_link_is_active(struct tipc_link *l_ptr);
void tipc_link_purge_queues(struct tipc_link *l_ptr);
void tipc_link_purge_backlog(struct tipc_link *l);
void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_activate(struct tipc_link *link);
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list);
int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
......@@ -243,13 +243,8 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
void link_prepare_wakeup(struct tipc_link *l);
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq);
static inline u32 link_own_addr(struct tipc_link *l)
{
return msg_prevnode(l->pmsg);
}
#endif
......@@ -110,7 +110,6 @@ struct tipc_skb_cb {
struct sk_buff *tail;
bool validated;
bool wakeup_pending;
bool bundling;
u16 chain_sz;
u16 chain_imp;
};
......@@ -559,15 +558,6 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n)
msg_set_bits(m, 1, 15, 0x1fff, n);
}
static inline bool msg_dup(struct tipc_msg *m)
{
if (likely(msg_user(m) != TUNNEL_PROTOCOL))
return false;
if (msg_type(m) != SYNCH_MSG)
return false;
return true;
}
/*
* Word 2
*/
......@@ -621,12 +611,12 @@ static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
}
static inline u32 msg_next_sent(struct tipc_msg *m)
static inline u16 msg_next_sent(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
}
static inline void msg_set_next_sent(struct tipc_msg *m, u32 n)
static inline void msg_set_next_sent(struct tipc_msg *m, u16 n)
{
msg_set_bits(m, 4, 0, 0xffff, n);
}
......@@ -727,12 +717,12 @@ static inline char *msg_media_addr(struct tipc_msg *m)
/*
* Word 9
*/
static inline u32 msg_msgcnt(struct tipc_msg *m)
static inline u16 msg_msgcnt(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
}
static inline void msg_set_msgcnt(struct tipc_msg *m, u32 n)
static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
{
msg_set_bits(m, 9, 16, 0xffff, n);
}
......@@ -767,19 +757,19 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n);
}
static inline bool msg_is_traffic(struct tipc_msg *m)
static inline bool msg_peer_link_is_up(struct tipc_msg *m)
{
if (likely(msg_user(m) != LINK_PROTOCOL))
return true;
if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG))
return false;
if (msg_type(m) == STATE_MSG)
return true;
return false;
}
static inline bool msg_peer_is_up(struct tipc_msg *m)
static inline bool msg_peer_node_is_up(struct tipc_msg *m)
{
if (likely(msg_is_traffic(m)))
return false;
if (msg_peer_link_is_up(m))
return true;
return msg_redundant_link(m);
}
......@@ -872,28 +862,6 @@ static inline struct sk_buff *tipc_skb_dequeue(struct sk_buff_head *list,
return skb;
}
/* tipc_skb_queue_tail(): add buffer to tail of list;
* @list: list to be appended to
* @skb: buffer to append. Always appended
* @dport: the destination port of the buffer
* returns true if dport differs from previous destination
*/
static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
struct sk_buff *skb, u32 dport)
{
struct sk_buff *_skb = NULL;
bool rv = false;
spin_lock_bh(&list->lock);
_skb = skb_peek_tail(list);
if (!_skb || (msg_destport(buf_msg(_skb)) != dport) ||
(skb_queue_len(list) > 32))
rv = true;
__skb_queue_tail(list, skb);
spin_unlock_bh(&list->lock);
return rv;
}
/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
* @list: list to be appended to
* @skb: buffer to add
......@@ -926,4 +894,33 @@ static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list,
return false;
}
/* tipc_skb_queue_splice_tail - append an skb list to lock protected list
* @list: the new list to append. Not lock protected
* @head: target list. Lock protected.
*/
static inline void tipc_skb_queue_splice_tail(struct sk_buff_head *list,
struct sk_buff_head *head)
{
spin_lock_bh(&head->lock);
skb_queue_splice_tail(list, head);
spin_unlock_bh(&head->lock);
}
/* tipc_skb_queue_splice_tail_init - merge two lock protected skb lists
* @list: the new list to add. Lock protected. Will be reinitialized
* @head: target list. Lock protected.
*/
static inline void tipc_skb_queue_splice_tail_init(struct sk_buff_head *list,
struct sk_buff_head *head)
{
struct sk_buff_head tmp;
__skb_queue_head_init(&tmp);
spin_lock_bh(&list->lock);
skb_queue_splice_tail_init(list, &tmp);
spin_unlock_bh(&list->lock);
tipc_skb_queue_splice_tail(&tmp, head);
}
#endif
......@@ -42,7 +42,36 @@
#include "bcast.h"
#include "discover.h"
static void node_lost_contact(struct tipc_node *n_ptr);
/* Node FSM states and events:
*/
enum {
SELF_DOWN_PEER_DOWN = 0xdd,
SELF_UP_PEER_UP = 0xaa,
SELF_DOWN_PEER_LEAVING = 0xd1,
SELF_UP_PEER_COMING = 0xac,
SELF_COMING_PEER_UP = 0xca,
SELF_LEAVING_PEER_DOWN = 0x1d,
NODE_FAILINGOVER = 0xf0,
NODE_SYNCHING = 0xcc
};
enum {
SELF_ESTABL_CONTACT_EVT = 0xece,
SELF_LOST_CONTACT_EVT = 0x1ce,
PEER_ESTABL_CONTACT_EVT = 0x9ece,
PEER_LOST_CONTACT_EVT = 0x91ce,
NODE_FAILOVER_BEGIN_EVT = 0xfbe,
NODE_FAILOVER_END_EVT = 0xfee,
NODE_SYNCH_BEGIN_EVT = 0xcbe,
NODE_SYNCH_END_EVT = 0xcee
};
static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
struct sk_buff_head *xmitq,
struct tipc_media_addr **maddr);
static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
bool delete);
static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
static void node_established_contact(struct tipc_node *n_ptr);
static void tipc_node_delete(struct tipc_node *node);
static void tipc_node_timeout(unsigned long data);
......@@ -113,7 +142,7 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)
return NULL;
}
struct tipc_node *tipc_node_create(struct net *net, u32 addr)
struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_node *n_ptr, *temp_node;
......@@ -129,6 +158,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
}
n_ptr->addr = addr;
n_ptr->net = net;
n_ptr->capabilities = capabilities;
kref_init(&n_ptr->kref);
spin_lock_init(&n_ptr->lock);
INIT_HLIST_NODE(&n_ptr->hash);
......@@ -249,9 +279,8 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
static void tipc_node_timeout(unsigned long data)
{
struct tipc_node *n = (struct tipc_node *)data;
struct tipc_link_entry *le;
struct sk_buff_head xmitq;
struct tipc_link *l;
struct tipc_media_addr *maddr;
int bearer_id;
int rc = 0;
......@@ -259,17 +288,16 @@ static void tipc_node_timeout(unsigned long data)
for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
tipc_node_lock(n);
l = n->links[bearer_id].link;
if (l) {
le = &n->links[bearer_id];
if (le->link) {
/* Link tolerance may change asynchronously: */
tipc_node_calculate_timer(n, l);
rc = tipc_link_timeout(l, &xmitq);
if (rc & TIPC_LINK_DOWN_EVT)
tipc_link_reset(l);
tipc_node_calculate_timer(n, le->link);
rc = tipc_link_timeout(le->link, &xmitq);
}
tipc_node_unlock(n);
maddr = &n->links[bearer_id].maddr;
tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
if (rc & TIPC_LINK_DOWN_EVT)
tipc_node_link_down(n, bearer_id, false);
}
if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
tipc_node_get(n);
......@@ -277,68 +305,92 @@ static void tipc_node_timeout(unsigned long data)
}
/**
* tipc_node_link_up - handle addition of link
*
* __tipc_node_link_up - handle addition of link
* Node lock must be held by caller
* Link becomes active (alone or shared) or standby, depending on its priority.
*/
void tipc_node_link_up(struct tipc_node *n, int bearer_id)
static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
struct sk_buff_head *xmitq)
{
int *slot0 = &n->active_links[0];
int *slot1 = &n->active_links[1];
struct tipc_link_entry *links = n->links;
struct tipc_link *l = n->links[bearer_id].link;
struct tipc_link *ol = node_active_link(n, 0);
struct tipc_link *nl = n->links[bearer_id].link;
/* Leave room for tunnel header when returning 'mtu' to users: */
links[bearer_id].mtu = l->mtu - INT_H_SIZE;
if (!nl || !tipc_link_is_up(nl))
return;
n->working_links++;
n->action_flags |= TIPC_NOTIFY_LINK_UP;
n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
n->link_id = nl->peer_bearer_id << 16 | bearer_id;
/* Leave room for tunnel header when returning 'mtu' to users: */
n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
tipc_bearer_add_dest(n->net, bearer_id, n->addr);
pr_debug("Established link <%s> on network plane %c\n",
l->name, l->net_plane);
nl->name, nl->net_plane);
/* No active links ? => take both active slots */
if (*slot0 < 0) {
/* First link? => give it both slots */
if (!ol) {
*slot0 = bearer_id;
*slot1 = bearer_id;
tipc_link_build_bcast_sync_msg(nl, xmitq);
node_established_contact(n);
return;
}
/* Lower prio than current active ? => no slot */
if (l->priority < links[*slot0].link->priority) {
pr_debug("New link <%s> becomes standby\n", l->name);
return;
}
tipc_link_dup_queue_xmit(links[*slot0].link, l);
/* Same prio as current active ? => take one slot */
if (l->priority == links[*slot0].link->priority) {
/* Second link => redistribute slots */
if (nl->priority > ol->priority) {
pr_debug("Old link <%s> becomes standby\n", ol->name);
*slot0 = bearer_id;
return;
*slot1 = bearer_id;
} else if (nl->priority == ol->priority) {
*slot0 = bearer_id;
} else {
pr_debug("New link <%s> is standby\n", nl->name);
}
/* Higher prio than current active => take both active slots */
pr_debug("Old link <%s> now standby\n", links[*slot0].link->name);
*slot0 = bearer_id;
*slot1 = bearer_id;
/* Prepare synchronization with first link */
tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);
}
/**
* tipc_node_link_down - handle loss of link
* tipc_node_link_up - handle addition of link
*
* Link becomes active (alone or shared) or standby, depending on its priority.
*/
void tipc_node_link_down(struct tipc_node *n, int bearer_id)
static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
struct sk_buff_head *xmitq)
{
tipc_node_lock(n);
__tipc_node_link_up(n, bearer_id, xmitq);
tipc_node_unlock(n);
}
/**
* __tipc_node_link_down - handle loss of link
*/
static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
struct sk_buff_head *xmitq,
struct tipc_media_addr **maddr)
{
struct tipc_link_entry *le = &n->links[*bearer_id];
int *slot0 = &n->active_links[0];
int *slot1 = &n->active_links[1];
int i, highest = 0;
struct tipc_link *l, *_l;
struct tipc_link *l, *_l, *tnl;
l = n->links[*bearer_id].link;
if (!l || tipc_link_is_reset(l))
return;
l = n->links[bearer_id].link;
n->working_links--;
n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
n->link_id = l->peer_bearer_id << 16 | *bearer_id;
tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
pr_debug("Lost link <%s> on network plane %c\n",
l->name, l->net_plane);
......@@ -350,6 +402,8 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id)
_l = n->links[i].link;
if (!_l || !tipc_link_is_up(_l))
continue;
if (_l == l)
continue;
if (_l->priority < highest)
continue;
if (_l->priority > highest) {
......@@ -360,10 +414,43 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id)
}
*slot1 = i;
}
if (tipc_node_is_up(n))
tipc_link_failover_send_queue(l);
else
node_lost_contact(n);
if (!tipc_node_is_up(n)) {
tipc_link_reset(l);
node_lost_contact(n, &le->inputq);
return;
}
/* There is still a working link => initiate failover */
tnl = node_active_link(n, 0);
n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
tipc_link_reset(l);
tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
*maddr = &n->links[tnl->bearer_id].maddr;
*bearer_id = tnl->bearer_id;
}
static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
{
struct tipc_link_entry *le = &n->links[bearer_id];
struct tipc_media_addr *maddr;
struct sk_buff_head xmitq;
__skb_queue_head_init(&xmitq);
tipc_node_lock(n);
__tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
if (delete && le->link) {
kfree(le->link);
le->link = NULL;
n->link_cnt--;
}
tipc_node_unlock(n);
tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
tipc_sk_rcv(n->net, &le->inputq);
}
bool tipc_node_is_up(struct tipc_node *n)
......@@ -371,55 +458,150 @@ bool tipc_node_is_up(struct tipc_node *n)
return n->active_links[0] != INVALID_BEARER_ID;
}
void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *b,
bool *link_up, bool *addr_match,
struct tipc_media_addr *maddr)
void tipc_node_check_dest(struct net *net, u32 onode,
struct tipc_bearer *b,
u16 capabilities, u32 signature,
struct tipc_media_addr *maddr,
bool *respond, bool *dupl_addr)
{
struct tipc_link *l = n->links[b->identity].link;
struct tipc_media_addr *curr = &n->links[b->identity].maddr;
struct tipc_node *n;
struct tipc_link *l;
struct tipc_link_entry *le;
bool addr_match = false;
bool sign_match = false;
bool link_up = false;
bool accept_addr = false;
bool reset = true;
*dupl_addr = false;
*respond = false;
n = tipc_node_create(net, onode, capabilities);
if (!n)
return;
*link_up = l && tipc_link_is_up(l);
*addr_match = l && !memcmp(curr, maddr, sizeof(*maddr));
}
tipc_node_lock(n);
bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b,
struct tipc_media_addr *maddr)
{
struct tipc_link *l = n->links[b->identity].link;
struct tipc_media_addr *curr = &n->links[b->identity].maddr;
struct sk_buff_head *inputq = &n->links[b->identity].inputq;
le = &n->links[b->identity];
/* Prepare to validate requesting node's signature and media address */
l = le->link;
link_up = l && tipc_link_is_up(l);
addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr));
sign_match = (signature == n->signature);
/* These three flags give us eight permutations: */
if (sign_match && addr_match && link_up) {
/* All is fine. Do nothing. */
reset = false;
} else if (sign_match && addr_match && !link_up) {
/* Respond. The link will come up in due time */
*respond = true;
} else if (sign_match && !addr_match && link_up) {
/* Peer has changed i/f address without rebooting.
* If so, the link will reset soon, and the next
* discovery will be accepted. So we can ignore it.
* It may also be an cloned or malicious peer having
* chosen the same node address and signature as an
* existing one.
* Ignore requests until the link goes down, if ever.
*/
*dupl_addr = true;
} else if (sign_match && !addr_match && !link_up) {
/* Peer link has changed i/f address without rebooting.
* It may also be a cloned or malicious peer; we can't
* distinguish between the two.
* The signature is correct, so we must accept.
*/
accept_addr = true;
*respond = true;
} else if (!sign_match && addr_match && link_up) {
/* Peer node rebooted. Two possibilities:
* - Delayed re-discovery; this link endpoint has already
* reset and re-established contact with the peer, before
* receiving a discovery message from that node.
* (The peer happened to receive one from this node first).
* - The peer came back so fast that our side has not
* discovered it yet. Probing from this side will soon
* reset the link, since there can be no working link
* endpoint at the peer end, and the link will re-establish.
* Accept the signature, since it comes from a known peer.
*/
n->signature = signature;
} else if (!sign_match && addr_match && !link_up) {
/* The peer node has rebooted.
* Accept signature, since it is a known peer.
*/
n->signature = signature;
*respond = true;
} else if (!sign_match && !addr_match && link_up) {
/* Peer rebooted with new address, or a new/duplicate peer.
* Ignore until the link goes down, if ever.
*/
*dupl_addr = true;
} else if (!sign_match && !addr_match && !link_up) {
/* Peer rebooted with new address, or it is a new peer.
* Accept signature and address.
*/
n->signature = signature;
accept_addr = true;
*respond = true;
}
if (!accept_addr)
goto exit;
/* Now create new link if not already existing */
if (!l) {
l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq);
if (!l)
return false;
if (n->link_cnt == 2) {
pr_warn("Cannot establish 3rd link to %x\n", n->addr);
goto exit;
}
if (!tipc_link_create(n, b, mod(tipc_net(net)->random),
tipc_own_addr(net), onode, &le->maddr,
&le->inputq, &n->bclink.namedq, &l)) {
*respond = false;
goto exit;
}
tipc_link_reset(l);
le->link = l;
n->link_cnt++;
tipc_node_calculate_timer(n, l);
if (n->link_cnt == 1) {
if (n->link_cnt == 1)
if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
tipc_node_get(n);
}
}
memcpy(&l->media_addr, maddr, sizeof(*maddr));
memcpy(curr, maddr, sizeof(*maddr));
tipc_link_reset(l);
return true;
memcpy(&le->maddr, maddr, sizeof(*maddr));
exit:
tipc_node_unlock(n);
if (reset)
tipc_node_link_down(n, b->identity, false);
tipc_node_put(n);
}
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
void tipc_node_delete_links(struct net *net, int bearer_id)
{
n_ptr->links[l_ptr->bearer_id].link = l_ptr;
n_ptr->link_cnt++;
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_node *n;
rcu_read_lock();
list_for_each_entry_rcu(n, &tn->node_list, list) {
tipc_node_link_down(n, bearer_id, true);
}
rcu_read_unlock();
}
void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
static void tipc_node_reset_links(struct tipc_node *n)
{
char addr_string[16];
int i;
pr_warn("Resetting all links to %s\n",
tipc_addr_string_fill(addr_string, n->addr));
for (i = 0; i < MAX_BEARERS; i++) {
if (l_ptr != n_ptr->links[i].link)
continue;
n_ptr->links[i].link = NULL;
n_ptr->link_cnt--;
tipc_node_link_down(n, i, false);
}
}
......@@ -442,8 +624,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
case SELF_LOST_CONTACT_EVT:
case PEER_LOST_CONTACT_EVT:
break;
case NODE_SYNCH_END_EVT:
case NODE_SYNCH_BEGIN_EVT:
case NODE_FAILOVER_BEGIN_EVT:
case NODE_FAILOVER_END_EVT:
default:
pr_err("Unknown node fsm evt %x/%x\n", state, evt);
goto illegal_evt;
}
break;
case SELF_UP_PEER_UP:
......@@ -454,11 +640,19 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
case PEER_LOST_CONTACT_EVT:
state = SELF_LEAVING_PEER_DOWN;
break;
case NODE_SYNCH_BEGIN_EVT:
state = NODE_SYNCHING;
break;
case NODE_FAILOVER_BEGIN_EVT:
state = NODE_FAILINGOVER;
break;
case SELF_ESTABL_CONTACT_EVT:
case PEER_ESTABL_CONTACT_EVT:
case NODE_SYNCH_END_EVT:
case NODE_FAILOVER_END_EVT:
break;
default:
pr_err("Unknown node fsm evt %x/%x\n", state, evt);
goto illegal_evt;
}
break;
case SELF_DOWN_PEER_LEAVING:
......@@ -470,8 +664,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
case PEER_ESTABL_CONTACT_EVT:
case SELF_LOST_CONTACT_EVT:
break;
case NODE_SYNCH_END_EVT:
case NODE_SYNCH_BEGIN_EVT:
case NODE_FAILOVER_BEGIN_EVT:
case NODE_FAILOVER_END_EVT:
default:
pr_err("Unknown node fsm evt %x/%x\n", state, evt);
goto illegal_evt;
}
break;
case SELF_UP_PEER_COMING:
......@@ -485,8 +683,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
case SELF_ESTABL_CONTACT_EVT:
case PEER_LOST_CONTACT_EVT:
break;
case NODE_SYNCH_END_EVT:
case NODE_SYNCH_BEGIN_EVT:
case NODE_FAILOVER_BEGIN_EVT:
case NODE_FAILOVER_END_EVT:
default:
pr_err("Unknown node fsm evt %x/%x\n", state, evt);
goto illegal_evt;
}
break;
case SELF_COMING_PEER_UP:
......@@ -500,8 +702,12 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
case SELF_LOST_CONTACT_EVT:
case PEER_ESTABL_CONTACT_EVT:
break;
case NODE_SYNCH_END_EVT:
case NODE_SYNCH_BEGIN_EVT:
case NODE_FAILOVER_BEGIN_EVT:
case NODE_FAILOVER_END_EVT:
default:
pr_err("Unknown node fsm evt %x/%x\n", state, evt);
goto illegal_evt;
}
break;
case SELF_LEAVING_PEER_DOWN:
......@@ -513,49 +719,85 @@ static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
case PEER_ESTABL_CONTACT_EVT:
case PEER_LOST_CONTACT_EVT:
break;
case NODE_SYNCH_END_EVT:
case NODE_SYNCH_BEGIN_EVT:
case NODE_FAILOVER_BEGIN_EVT:
case NODE_FAILOVER_END_EVT:
default:
pr_err("Unknown node fsm evt %x/%x\n", state, evt);
goto illegal_evt;
}
break;
case NODE_FAILINGOVER:
switch (evt) {
case SELF_LOST_CONTACT_EVT:
state = SELF_DOWN_PEER_LEAVING;
break;
case PEER_LOST_CONTACT_EVT:
state = SELF_LEAVING_PEER_DOWN;
break;
case NODE_FAILOVER_END_EVT:
state = SELF_UP_PEER_UP;
break;
case NODE_FAILOVER_BEGIN_EVT:
case SELF_ESTABL_CONTACT_EVT:
case PEER_ESTABL_CONTACT_EVT:
break;
case NODE_SYNCH_BEGIN_EVT:
case NODE_SYNCH_END_EVT:
default:
goto illegal_evt;
}
break;
case NODE_SYNCHING:
switch (evt) {
case SELF_LOST_CONTACT_EVT:
state = SELF_DOWN_PEER_LEAVING;
break;
case PEER_LOST_CONTACT_EVT:
state = SELF_LEAVING_PEER_DOWN;
break;
case NODE_SYNCH_END_EVT:
state = SELF_UP_PEER_UP;
break;
case NODE_FAILOVER_BEGIN_EVT:
state = NODE_FAILINGOVER;
break;
case NODE_SYNCH_BEGIN_EVT:
case SELF_ESTABL_CONTACT_EVT:
case PEER_ESTABL_CONTACT_EVT:
break;
case NODE_FAILOVER_END_EVT:
default:
goto illegal_evt;
}
break;
default:
pr_err("Unknown node fsm state %x\n", state);
break;
}
n->state = state;
return;
illegal_evt:
pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
}
bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l,
struct tipc_msg *hdr)
bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
{
int state = n->state;
if (likely(state == SELF_UP_PEER_UP))
return true;
if (state == SELF_DOWN_PEER_DOWN)
return true;
if (state == SELF_UP_PEER_COMING) {
/* If not traffic msg, peer may still be ESTABLISHING */
if (tipc_link_is_up(l) && msg_is_traffic(hdr))
tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
return true;
}
if (state == SELF_COMING_PEER_UP)
return true;
if (state == SELF_LEAVING_PEER_DOWN)
return false;
if (state == SELF_DOWN_PEER_LEAVING) {
if (msg_peer_is_up(hdr))
if (msg_peer_node_is_up(hdr))
return false;
tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
return true;
}
return false;
return true;
}
static void node_established_contact(struct tipc_node *n_ptr)
......@@ -567,10 +809,12 @@ static void node_established_contact(struct tipc_node *n_ptr)
tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
}
static void node_lost_contact(struct tipc_node *n_ptr)
static void node_lost_contact(struct tipc_node *n_ptr,
struct sk_buff_head *inputq)
{
char addr_string[16];
struct tipc_sock_conn *conn, *safe;
struct tipc_link *l;
struct list_head *conns = &n_ptr->conn_sks;
struct sk_buff *skb;
struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
......@@ -596,16 +840,11 @@ static void node_lost_contact(struct tipc_node *n_ptr)
/* Abort any ongoing link failover */
for (i = 0; i < MAX_BEARERS; i++) {
struct tipc_link *l_ptr = n_ptr->links[i].link;
if (!l_ptr)
continue;
l_ptr->exec_mode = TIPC_LINK_OPEN;
l_ptr->failover_checkpt = 0;
l_ptr->failover_pkts = 0;
kfree_skb(l_ptr->failover_skb);
l_ptr->failover_skb = NULL;
tipc_link_reset_fragments(l_ptr);
l = n_ptr->links[i].link;
if (l)
tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
}
/* Prevent re-contact with node until cleanup is done */
tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);
......@@ -618,10 +857,8 @@ static void node_lost_contact(struct tipc_node *n_ptr)
SHORT_H_SIZE, 0, tn->own_addr,
conn->peer_node, conn->port,
conn->peer_port, TIPC_ERR_NO_NODE);
if (likely(skb)) {
skb_queue_tail(n_ptr->inputq, skb);
n_ptr->action_flags |= TIPC_MSG_EVT;
}
if (likely(skb))
skb_queue_tail(inputq, skb);
list_del(&conn->list);
kfree(conn);
}
......@@ -668,27 +905,20 @@ void tipc_node_unlock(struct tipc_node *node)
u32 flags = node->action_flags;
u32 link_id = 0;
struct list_head *publ_list;
struct sk_buff_head *inputq = node->inputq;
struct sk_buff_head *namedq;
if (likely(!flags || (flags == TIPC_MSG_EVT))) {
node->action_flags = 0;
if (likely(!flags)) {
spin_unlock_bh(&node->lock);
if (flags == TIPC_MSG_EVT)
tipc_sk_rcv(net, inputq);
return;
}
addr = node->addr;
link_id = node->link_id;
namedq = node->namedq;
publ_list = &node->publ_list;
node->action_flags &= ~(TIPC_MSG_EVT |
TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
TIPC_NAMED_MSG_EVT | TIPC_BCAST_RESET);
TIPC_BCAST_RESET);
spin_unlock_bh(&node->lock);
......@@ -709,17 +939,11 @@ void tipc_node_unlock(struct tipc_node *node)
tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
link_id, addr);
if (flags & TIPC_MSG_EVT)
tipc_sk_rcv(net, inputq);
if (flags & TIPC_NAMED_MSG_EVT)
tipc_named_rcv(net, namedq);
if (flags & TIPC_BCAST_MSG_EVT)
tipc_bclink_input(net);
if (flags & TIPC_BCAST_RESET)
tipc_link_reset_all(node);
tipc_node_reset_links(node);
}
/* Caller should hold node lock for the passed node */
......@@ -796,9 +1020,9 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
if (likely(l))
rc = tipc_link_xmit(l, list, &xmitq);
if (unlikely(rc == -ENOBUFS))
tipc_link_reset(l);
tipc_node_unlock(n);
if (unlikely(rc == -ENOBUFS))
tipc_node_link_down(n, bearer_id, false);
tipc_node_put(n);
}
if (likely(!rc)) {
......@@ -834,6 +1058,120 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
return 0;
}
/**
* tipc_node_check_state - check and if necessary update node state
* @skb: TIPC packet
* @bearer_id: identity of bearer delivering the packet
* Returns true if state is ok, otherwise consumes buffer and returns false
*/
static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
int bearer_id, struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr = buf_msg(skb);
int usr = msg_user(hdr);
int mtyp = msg_type(hdr);
u16 oseqno = msg_seqno(hdr);
u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
u16 exp_pkts = msg_msgcnt(hdr);
u16 rcv_nxt, syncpt, dlv_nxt;
int state = n->state;
struct tipc_link *l, *pl = NULL;
struct tipc_media_addr *maddr;
int i, pb_id;
l = n->links[bearer_id].link;
if (!l)
return false;
rcv_nxt = l->rcv_nxt;
if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
return true;
/* Find parallel link, if any */
for (i = 0; i < MAX_BEARERS; i++) {
if ((i != bearer_id) && n->links[i].link) {
pl = n->links[i].link;
break;
}
}
/* Update node accesibility if applicable */
if (state == SELF_UP_PEER_COMING) {
if (!tipc_link_is_up(l))
return true;
if (!msg_peer_link_is_up(hdr))
return true;
tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
}
if (state == SELF_DOWN_PEER_LEAVING) {
if (msg_peer_node_is_up(hdr))
return false;
tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
}
/* Ignore duplicate packets */
if (less(oseqno, rcv_nxt))
return true;
/* Initiate or update failover mode if applicable */
if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
syncpt = oseqno + exp_pkts - 1;
if (pl && tipc_link_is_up(pl)) {
pb_id = pl->bearer_id;
__tipc_node_link_down(n, &pb_id, xmitq, &maddr);
tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq);
}
/* If pkts arrive out of order, use lowest calculated syncpt */
if (less(syncpt, n->sync_point))
n->sync_point = syncpt;
}
/* Open parallel link when tunnel link reaches synch point */
if ((n->state == NODE_FAILINGOVER) && !tipc_link_is_failingover(l)) {
if (!more(rcv_nxt, n->sync_point))
return true;
tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT);
if (pl)
tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT);
return true;
}
/* Initiate or update synch mode if applicable */
if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) {
syncpt = iseqno + exp_pkts - 1;
if (!tipc_link_is_up(l)) {
tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
__tipc_node_link_up(n, bearer_id, xmitq);
}
if (n->state == SELF_UP_PEER_UP) {
n->sync_point = syncpt;
tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT);
tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
}
if (less(syncpt, n->sync_point))
n->sync_point = syncpt;
}
/* Open tunnel link when parallel link reaches synch point */
if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) {
if (pl)
dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq));
if (!pl || more(dlv_nxt, n->sync_point)) {
tipc_link_fsm_evt(l, LINK_SYNCH_END_EVT);
tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
return true;
}
if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG))
return true;
if (usr == LINK_PROTOCOL)
return true;
return false;
}
return true;
}
/**
* tipc_rcv - process TIPC packets/messages arriving from off-node
* @net: the applicable net namespace
......@@ -847,10 +1185,10 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
{
struct sk_buff_head xmitq;
struct tipc_node *n;
struct tipc_link *l;
struct tipc_msg *hdr;
struct tipc_media_addr *maddr;
struct tipc_msg *hdr = buf_msg(skb);
int usr = msg_user(hdr);
int bearer_id = b->identity;
struct tipc_link_entry *le;
int rc = 0;
__skb_queue_head_init(&xmitq);
......@@ -860,9 +1198,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
goto discard;
/* Handle arrival of a non-unicast link packet */
hdr = buf_msg(skb);
if (unlikely(msg_non_seq(hdr))) {
if (msg_user(hdr) == LINK_CONFIG)
if (usr == LINK_CONFIG)
tipc_disc_rcv(net, skb, b);
else
tipc_bclink_rcv(net, skb);
......@@ -873,38 +1210,44 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
n = tipc_node_find(net, msg_prevnode(hdr));
if (unlikely(!n))
goto discard;
tipc_node_lock(n);
le = &n->links[bearer_id];
/* Locate link endpoint that should handle packet */
l = n->links[bearer_id].link;
if (unlikely(!l))
goto unlock;
tipc_node_lock(n);
/* Is reception of this packet permitted at the moment ? */
if (unlikely(n->state != SELF_UP_PEER_UP))
if (!tipc_node_filter_skb(n, l, hdr))
/* Is reception permitted at the moment ? */
if (!tipc_node_filter_pkt(n, hdr))
goto unlock;
if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
tipc_bclink_sync_state(n, hdr);
/* Release acked broadcast messages */
/* Release acked broadcast packets */
if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
/* Check protocol and update link state */
rc = tipc_link_rcv(l, skb, &xmitq);
if (unlikely(rc & TIPC_LINK_UP_EVT))
tipc_link_activate(l);
if (unlikely(rc & TIPC_LINK_DOWN_EVT))
tipc_link_reset(l);
/* Check and if necessary update node state */
if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
rc = tipc_link_rcv(le->link, skb, &xmitq);
skb = NULL;
}
unlock:
tipc_node_unlock(n);
tipc_sk_rcv(net, &n->links[bearer_id].inputq);
maddr = &n->links[bearer_id].maddr;
tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
if (unlikely(rc & TIPC_LINK_UP_EVT))
tipc_node_link_up(n, bearer_id, &xmitq);
if (unlikely(rc & TIPC_LINK_DOWN_EVT))
tipc_node_link_down(n, bearer_id, false);
if (unlikely(!skb_queue_empty(&n->bclink.namedq)))
tipc_named_rcv(net, &n->bclink.namedq);
if (!skb_queue_empty(&le->inputq))
tipc_sk_rcv(net, &le->inputq);
if (!skb_queue_empty(&xmitq))
tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
tipc_node_put(n);
discard:
kfree_skb(skb);
......
......@@ -47,39 +47,17 @@
#define INVALID_BEARER_ID -1
/* Node FSM states and events:
*/
enum {
SELF_DOWN_PEER_DOWN = 0xdd,
SELF_UP_PEER_UP = 0xaa,
SELF_DOWN_PEER_LEAVING = 0xd1,
SELF_UP_PEER_COMING = 0xac,
SELF_COMING_PEER_UP = 0xca,
SELF_LEAVING_PEER_DOWN = 0x1d,
};
enum {
SELF_ESTABL_CONTACT_EVT = 0xec,
SELF_LOST_CONTACT_EVT = 0x1c,
PEER_ESTABL_CONTACT_EVT = 0xfec,
PEER_LOST_CONTACT_EVT = 0xf1c
};
/* Flags used to take different actions according to flag type
* TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down
* TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down
* TIPC_NOTIFY_NODE_DOWN: notify node is down
* TIPC_NOTIFY_NODE_UP: notify node is up
* TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
*/
enum {
TIPC_MSG_EVT = 1,
TIPC_NOTIFY_NODE_DOWN = (1 << 3),
TIPC_NOTIFY_NODE_UP = (1 << 4),
TIPC_WAKEUP_BCAST_USERS = (1 << 5),
TIPC_NOTIFY_LINK_UP = (1 << 6),
TIPC_NOTIFY_LINK_DOWN = (1 << 7),
TIPC_NAMED_MSG_EVT = (1 << 8),
TIPC_BCAST_MSG_EVT = (1 << 9),
TIPC_BCAST_RESET = (1 << 10)
};
......@@ -127,6 +105,8 @@ struct tipc_link_entry {
* @links: array containing references to all links to node
* @action_flags: bit mask of different types of node actions
* @bclink: broadcast-related info
* @state: connectivity state vs peer node
* @sync_point: sequence number where synch/failover is finished
* @list: links to adjacent nodes in sorted list of cluster's nodes
* @working_links: number of working links to node (both active and standby)
* @link_cnt: number of links to node
......@@ -142,14 +122,13 @@ struct tipc_node {
spinlock_t lock;
struct net *net;
struct hlist_node hash;
struct sk_buff_head *inputq;
struct sk_buff_head *namedq;
int active_links[2];
struct tipc_link_entry links[MAX_BEARERS];
int action_flags;
struct tipc_node_bclink bclink;
struct list_head list;
int state;
u16 sync_point;
int link_cnt;
u16 working_links;
u16 capabilities;
......@@ -164,17 +143,15 @@ struct tipc_node {
struct tipc_node *tipc_node_find(struct net *net, u32 addr);
void tipc_node_put(struct tipc_node *node);
struct tipc_node *tipc_node_create(struct net *net, u32 addr);
void tipc_node_stop(struct net *net);
void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *bearer,
bool *link_up, bool *addr_match,
struct tipc_media_addr *maddr);
bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *bearer,
struct tipc_media_addr *maddr);
void tipc_node_check_dest(struct net *net, u32 onode,
struct tipc_bearer *bearer,
u16 capabilities, u32 signature,
struct tipc_media_addr *maddr,
bool *respond, bool *dupl_addr);
void tipc_node_delete_links(struct net *net, int bearer_id);
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id);
void tipc_node_link_up(struct tipc_node *n_ptr, int bearer_id);
bool tipc_node_is_up(struct tipc_node *n);
int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
char *linkname, size_t len);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment