Commit d999297c authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller

tipc: reduce locking scope during packet reception

We convert packet/message reception according to the same principle
we have been using for message sending and timeout handling:

We move the function tipc_rcv() to node.c, hence handling the initial
packet reception at the link aggregation level. The function grabs
the node lock, selects the receiving link, and accesses it via a new
call tipc_link_rcv(). This function appends buffers to the input
queue for delivery upwards, but it may also append outgoing packets
to the xmit queue, just as we do during regular message sending. The
latter will happen when buffers are forwarded from the link backlog,
or when retransmission is requested.

Upon return of this function, and after having released the node lock,
tipc_rcv() delivers/tranmsits the contents of those queues, but it may
also perform actions such as link activation or reset, as indicated by
the return flags from the link.

This reduces the number of cpu cycles spent inside the node spinlock,
and reduces contention on that lock.
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 1a20cc25
......@@ -316,6 +316,29 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
}
}
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
{
u16 last = msg_last_bcast(hdr);
int mtyp = msg_type(hdr);
if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
return;
if (mtyp == STATE_MSG) {
tipc_bclink_update_link_state(n, last);
return;
}
/* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
* and transfer synch info in LINK_PROTOCOL messages.
*/
if (tipc_node_is_up(n))
return;
if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
return;
n->bclink.last_sent = last;
n->bclink.last_in = last;
n->bclink.oos_state = 0;
}
/**
* bclink_peek_nack - monitor retransmission requests sent by other nodes
*
......
......@@ -133,5 +133,6 @@ void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
void tipc_bclink_input(struct net *net);
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
#endif
......@@ -129,6 +129,11 @@ static inline int less(u16 left, u16 right)
return less_eq(left, right) && (mod(right) != mod(left));
}
static inline int in_range(u16 val, u16 min, u16 max)
{
return !less(val, min) && !more(val, max);
}
#ifdef CONFIG_SYSCTL
int tipc_register_sysctl(void);
void tipc_unregister_sysctl(void);
......
......@@ -76,6 +76,10 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
[TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
};
/*
* Interval between NACKs when packets arrive out of order
*/
#define TIPC_NACK_INTV (TIPC_MIN_LINK_WIN * 2)
/*
* Out-of-range value for link session numbers
*/
......@@ -123,22 +127,19 @@ static int link_establishing(struct tipc_link *l)
return l->state == TIPC_LINK_ESTABLISHING;
}
static void link_handle_out_of_seq_msg(struct tipc_link *link,
struct sk_buff *skb);
static void tipc_link_proto_rcv(struct tipc_link *link,
struct sk_buff *skb);
static void link_state_event(struct tipc_link *l_ptr, u32 event);
static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq);
static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
u16 rcvgap, int tolerance, int priority,
struct sk_buff_head *xmitq);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
static void tipc_link_sync_xmit(struct tipc_link *l);
static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
struct sk_buff_head *xmitq);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
static void link_activate(struct tipc_link *link);
/*
* Simple link routines
......@@ -283,6 +284,26 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
rcu_read_unlock();
}
/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
*
* Give a newly added peer node the sequence number where it should
* start receiving and acking broadcast packets.
*/
static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
struct sk_buff_head *xmitq)
{
struct sk_buff *skb;
struct sk_buff_head list;
skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
0, l->addr, link_own_addr(l), 0, 0, 0);
if (!skb)
return;
__skb_queue_head_init(&list);
__skb_queue_tail(&list, skb);
tipc_link_xmit(l, &list, xmitq);
}
/**
* tipc_link_fsm_evt - link finite state machine
* @l: pointer to link
......@@ -300,7 +321,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
SND_PROBE = (1 << 2),
SND_STATE = (1 << 3),
SND_RESET = (1 << 4),
SND_ACTIVATE = (1 << 5)
SND_ACTIVATE = (1 << 5),
SND_BCAST_SYNC = (1 << 6)
} actions = 0;
if (l->exec_mode == TIPC_LINK_BLOCKED)
......@@ -352,8 +374,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
if (pl && link_probing(pl))
break;
actions |= LINK_ACTIVATE;
if (l->owner->working_links == 1)
tipc_link_sync_xmit(l);
if (!l->owner->working_links)
actions |= SND_BCAST_SYNC;
break;
case PEER_RESET_EVT:
l->state = TIPC_LINK_ESTABLISHING;
......@@ -374,8 +396,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
if (pl && link_probing(pl))
break;
actions |= LINK_ACTIVATE;
if (l->owner->working_links == 1)
tipc_link_sync_xmit(l);
if (!l->owner->working_links)
actions |= SND_BCAST_SYNC;
break;
case PEER_RESET_EVT:
break;
......@@ -408,6 +430,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE))
tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE,
0, 0, 0, xmitq);
if (actions & SND_BCAST_SYNC)
tipc_link_build_bcast_sync_msg(l, xmitq);
return rc;
}
......@@ -605,12 +629,14 @@ void tipc_link_reset(struct tipc_link *l_ptr)
l_ptr->reasm_buf = NULL;
l_ptr->rcv_unacked = 0;
l_ptr->snd_nxt = 1;
l_ptr->rcv_nxt = 1;
l_ptr->silent_intv_cnt = 0;
l_ptr->stats.recv_info = 0;
l_ptr->stale_count = 0;
link_reset_statistics(l_ptr);
}
static void link_activate(struct tipc_link *link)
void tipc_link_activate(struct tipc_link *link)
{
struct tipc_node *node = link->owner;
......@@ -623,36 +649,6 @@ static void link_activate(struct tipc_link *link)
tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
}
/**
* link_state_event - link finite state machine
* @l_ptr: pointer to link
* @event: state machine event to process
*/
static void link_state_event(struct tipc_link *l, unsigned int evt)
{
int rc;
struct sk_buff_head xmitq;
struct sk_buff *skb;
if (l->exec_mode == TIPC_LINK_BLOCKED)
return;
__skb_queue_head_init(&xmitq);
rc = tipc_link_fsm_evt(l, evt, &xmitq);
if (rc & TIPC_LINK_UP_EVT)
link_activate(l);
if (rc & TIPC_LINK_DOWN_EVT)
tipc_link_reset(l);
skb = __skb_dequeue(&xmitq);
if (!skb)
return;
tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
}
/**
* __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
* @link: link to use
......@@ -807,30 +803,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
return __tipc_link_xmit(link->owner->net, link, &head);
}
/*
* tipc_link_sync_xmit - synchronize broadcast link endpoints.
*
* Give a newly added peer node the sequence number where it should
* start receiving and acking broadcast packets.
*
* Called with node locked
*/
static void tipc_link_sync_xmit(struct tipc_link *link)
{
struct sk_buff *skb;
struct tipc_msg *msg;
skb = tipc_buf_acquire(INT_H_SIZE);
if (!skb)
return;
msg = buf_msg(skb);
tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
INT_H_SIZE, link->addr);
msg_set_last_bcast(msg, link->owner->bclink.acked);
__tipc_link_xmit_skb(link, skb);
}
/*
* tipc_link_sync_rcv - synchronize broadcast link endpoints.
* Receive the sequence number where we should start receiving and
......@@ -881,6 +853,34 @@ void tipc_link_push_packets(struct tipc_link *link)
link->snd_nxt = seqno;
}
void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
{
struct sk_buff *skb, *_skb;
struct tipc_msg *hdr;
u16 seqno = l->snd_nxt;
u16 ack = l->rcv_nxt - 1;
while (skb_queue_len(&l->transmq) < l->window) {
skb = skb_peek(&l->backlogq);
if (!skb)
break;
_skb = skb_clone(skb, GFP_ATOMIC);
if (!_skb)
break;
__skb_dequeue(&l->backlogq);
hdr = buf_msg(skb);
l->backlog[msg_importance(hdr)].len--;
__skb_queue_tail(&l->transmq, skb);
__skb_queue_tail(xmitq, _skb);
msg_set_ack(hdr, ack);
msg_set_seqno(hdr, seqno);
msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
l->rcv_unacked = 0;
seqno++;
}
l->snd_nxt = seqno;
}
void tipc_link_reset_all(struct tipc_node *node)
{
char addr_string[16];
......@@ -978,6 +978,41 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
}
}
static int tipc_link_retransm(struct tipc_link *l, int retransm,
struct sk_buff_head *xmitq)
{
struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
struct tipc_msg *hdr;
if (!skb)
return 0;
/* Detect repeated retransmit failures on same packet */
if (likely(l->last_retransm != buf_seqno(skb))) {
l->last_retransm = buf_seqno(skb);
l->stale_count = 1;
} else if (++l->stale_count > 100) {
link_retransmit_failure(l, skb);
return TIPC_LINK_DOWN_EVT;
}
skb_queue_walk(&l->transmq, skb) {
if (!retransm)
return 0;
hdr = buf_msg(skb);
_skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
if (!_skb)
return 0;
hdr = buf_msg(_skb);
msg_set_ack(hdr, l->rcv_nxt - 1);
msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
_skb->priority = TC_PRIO_CONTROL;
__skb_queue_tail(xmitq, _skb);
retransm--;
l->stats.retransmitted++;
}
return 0;
}
/* link_synch(): check if all packets arrived before the synch
* point have been consumed
* Returns true if the parallel links are synched, otherwise false
......@@ -1004,155 +1039,6 @@ static bool link_synch(struct tipc_link *l)
return true;
}
static void link_retrieve_defq(struct tipc_link *link,
struct sk_buff_head *list)
{
u16 seq_no;
if (skb_queue_empty(&link->deferdq))
return;
seq_no = buf_seqno(skb_peek(&link->deferdq));
if (seq_no == link->rcv_nxt)
skb_queue_splice_tail_init(&link->deferdq, list);
}
/**
* tipc_rcv - process TIPC packets/messages arriving from off-node
* @net: the applicable net namespace
* @skb: TIPC packet
* @b_ptr: pointer to bearer message arrived on
*
* Invoked with no locks held. Bearer pointer must point to a valid bearer
* structure (i.e. cannot be NULL), but bearer can be inactive.
*/
void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff_head head;
struct tipc_node *n_ptr;
struct tipc_link *l_ptr;
struct sk_buff *skb1, *tmp;
struct tipc_msg *msg;
u16 seq_no;
u16 ackd;
u32 released;
skb2list(skb, &head);
while ((skb = __skb_dequeue(&head))) {
/* Ensure message is well-formed */
if (unlikely(!tipc_msg_validate(skb)))
goto discard;
/* Handle arrival of a non-unicast link message */
msg = buf_msg(skb);
if (unlikely(msg_non_seq(msg))) {
if (msg_user(msg) == LINK_CONFIG)
tipc_disc_rcv(net, skb, b_ptr);
else
tipc_bclink_rcv(net, skb);
continue;
}
/* Discard unicast link messages destined for another node */
if (unlikely(!msg_short(msg) &&
(msg_destnode(msg) != tn->own_addr)))
goto discard;
/* Locate neighboring node that sent message */
n_ptr = tipc_node_find(net, msg_prevnode(msg));
if (unlikely(!n_ptr))
goto discard;
tipc_node_lock(n_ptr);
/* Locate unicast link endpoint that should handle message */
l_ptr = n_ptr->links[b_ptr->identity].link;
if (unlikely(!l_ptr))
goto unlock;
/* Is reception of this pkt permitted at the moment ? */
if (!tipc_node_filter_skb(n_ptr, msg))
goto unlock;
/* Validate message sequence number info */
seq_no = msg_seqno(msg);
ackd = msg_ack(msg);
/* Release acked messages */
if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
released = 0;
skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
if (more(buf_seqno(skb1), ackd))
break;
__skb_unlink(skb1, &l_ptr->transmq);
kfree_skb(skb1);
released = 1;
}
/* Try sending any messages link endpoint has pending */
if (unlikely(skb_queue_len(&l_ptr->backlogq)))
tipc_link_push_packets(l_ptr);
if (released && !skb_queue_empty(&l_ptr->wakeupq))
link_prepare_wakeup(l_ptr);
/* Process the incoming packet */
if (unlikely(!link_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) {
tipc_link_proto_rcv(l_ptr, skb);
link_retrieve_defq(l_ptr, &head);
skb = NULL;
goto unlock;
}
/* Traffic message. Conditionally activate link */
link_state_event(l_ptr, TRAFFIC_EVT);
if (link_working(l_ptr)) {
/* Re-insert buffer in front of queue */
__skb_queue_head(&head, skb);
skb = NULL;
goto unlock;
}
goto unlock;
}
/* Link is now in state TIPC_LINK_WORKING */
if (unlikely(seq_no != l_ptr->rcv_nxt)) {
link_handle_out_of_seq_msg(l_ptr, skb);
link_retrieve_defq(l_ptr, &head);
skb = NULL;
goto unlock;
}
l_ptr->silent_intv_cnt = 0;
/* Synchronize with parallel link if applicable */
if (unlikely((l_ptr->exec_mode == TIPC_LINK_TUNNEL) &&
!msg_dup(msg))) {
if (!link_synch(l_ptr))
goto unlock;
}
l_ptr->rcv_nxt++;
if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
link_retrieve_defq(l_ptr, &head);
if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
l_ptr->stats.sent_acks++;
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
}
tipc_link_input(l_ptr, skb);
skb = NULL;
unlock:
tipc_node_unlock(n_ptr);
tipc_node_put(n_ptr);
discard:
if (unlikely(skb))
kfree_skb(skb);
}
}
/* tipc_data_input - deliver data and name distr msgs to upper layer
*
* Consumes buffer if message is of right type
......@@ -1206,9 +1092,6 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
struct sk_buff *iskb;
int pos = 0;
if (likely(tipc_data_input(link, skb)))
return;
switch (msg_user(msg)) {
case TUNNEL_PROTOCOL:
if (msg_dup(msg)) {
......@@ -1247,6 +1130,110 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
};
}
static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
{
bool released = false;
struct sk_buff *skb, *tmp;
skb_queue_walk_safe(&l->transmq, skb, tmp) {
if (more(buf_seqno(skb), acked))
break;
__skb_unlink(skb, &l->transmq);
kfree_skb(skb);
released = true;
}
return released;
}
/* tipc_link_rcv - process TIPC packets/messages arriving from off-node
* @link: the link that should handle the message
* @skb: TIPC packet
* @xmitq: queue to place packets to be sent after this call
*/
int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{
struct sk_buff_head *arrvq = &l->deferdq;
struct sk_buff *tmp;
struct tipc_msg *hdr;
u16 seqno, rcv_nxt;
int rc = 0;
if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) {
if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV))
tipc_link_build_proto_msg(l, STATE_MSG, 0,
0, 0, 0, xmitq);
return rc;
}
skb_queue_walk_safe(arrvq, skb, tmp) {
hdr = buf_msg(skb);
/* Verify and update link state */
if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) {
__skb_dequeue(arrvq);
rc |= tipc_link_proto_rcv(l, skb, xmitq);
continue;
}
if (unlikely(!link_working(l))) {
rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
if (!link_working(l)) {
kfree_skb(__skb_dequeue(arrvq));
return rc;
}
}
l->silent_intv_cnt = 0;
/* Forward queues and wake up waiting users */
if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) {
tipc_link_advance_backlog(l, xmitq);
if (unlikely(!skb_queue_empty(&l->wakeupq)))
link_prepare_wakeup(l);
}
/* Defer reception if there is a gap in the sequence */
seqno = msg_seqno(hdr);
rcv_nxt = l->rcv_nxt;
if (unlikely(less(rcv_nxt, seqno))) {
l->stats.deferred_recv++;
return rc;
}
__skb_dequeue(arrvq);
/* Drop if packet already received */
if (unlikely(more(rcv_nxt, seqno))) {
l->stats.duplicates++;
kfree_skb(skb);
return rc;
}
/* Synchronize with parallel link if applicable */
if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL))
if (!msg_dup(hdr) && !link_synch(l)) {
kfree_skb(skb);
return rc;
}
/* Packet can be delivered */
l->rcv_nxt++;
l->stats.recv_info++;
if (unlikely(!tipc_data_input(l, skb)))
tipc_link_input(l, skb);
/* Ack at regular intervals */
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
l->rcv_unacked = 0;
l->stats.sent_acks++;
tipc_link_build_proto_msg(l, STATE_MSG,
0, 0, 0, 0, xmitq);
}
}
return rc;
}
/**
* tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
*
......@@ -1286,41 +1273,6 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
return 1;
}
/*
* link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
*/
static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
struct sk_buff *buf)
{
u32 seq_no = buf_seqno(buf);
if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
tipc_link_proto_rcv(l_ptr, buf);
return;
}
/* Record OOS packet arrival */
l_ptr->silent_intv_cnt = 0;
/*
* Discard packet if a duplicate; otherwise add it to deferred queue
* and notify peer of gap as per protocol specification
*/
if (less(seq_no, l_ptr->rcv_nxt)) {
l_ptr->stats.duplicates++;
kfree_skb(buf);
return;
}
if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
l_ptr->stats.deferred_recv++;
if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
} else {
l_ptr->stats.duplicates++;
}
}
/*
* Send protocol message to the other endpoint.
*/
......@@ -1341,119 +1293,6 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
kfree_skb(skb);
}
/*
* Receive protocol message :
* Note that network plane id propagates through the network, and may
* change at any time. The node with lowest address rules
*/
static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
struct sk_buff *buf)
{
u32 rec_gap = 0;
u32 msg_tol;
struct tipc_msg *msg = buf_msg(buf);
if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
goto exit;
if (l_ptr->net_plane != msg_net_plane(msg))
if (link_own_addr(l_ptr) > msg_prevnode(msg))
l_ptr->net_plane = msg_net_plane(msg);
switch (msg_type(msg)) {
case RESET_MSG:
if (!link_probing(l_ptr) &&
(l_ptr->peer_session != WILDCARD_SESSION)) {
if (less_eq(msg_session(msg), l_ptr->peer_session))
break; /* duplicate or old reset: ignore */
}
link_state_event(l_ptr, RESET_MSG);
/* fall thru' */
case ACTIVATE_MSG:
/* Update link settings according other endpoint's values */
strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
msg_tol = msg_link_tolerance(msg);
if (msg_tol > l_ptr->tolerance)
l_ptr->tolerance = msg_tol;
if (msg_linkprio(msg) > l_ptr->priority)
l_ptr->priority = msg_linkprio(msg);
if (l_ptr->mtu > msg_max_pkt(msg))
l_ptr->mtu = msg_max_pkt(msg);
/* Synchronize broadcast link info, if not done previously */
if (!tipc_node_is_up(l_ptr->owner)) {
l_ptr->owner->bclink.last_sent =
l_ptr->owner->bclink.last_in =
msg_last_bcast(msg);
l_ptr->owner->bclink.oos_state = 0;
}
l_ptr->peer_session = msg_session(msg);
l_ptr->peer_bearer_id = msg_bearer_id(msg);
if (!msg_peer_is_up(msg))
tipc_node_fsm_evt(l_ptr->owner, PEER_LOST_CONTACT_EVT);
if (msg_type(msg) == ACTIVATE_MSG)
link_state_event(l_ptr, ACTIVATE_MSG);
break;
case STATE_MSG:
msg_tol = msg_link_tolerance(msg);
if (msg_tol)
l_ptr->tolerance = msg_tol;
if (msg_linkprio(msg) &&
(msg_linkprio(msg) != l_ptr->priority)) {
pr_info("%s<%s>, priority change %u->%u\n",
link_rst_msg, l_ptr->name,
l_ptr->priority, msg_linkprio(msg));
l_ptr->priority = msg_linkprio(msg);
tipc_link_reset(l_ptr);
break;
}
/* Record reception; force mismatch at next timeout: */
l_ptr->silent_intv_cnt = 0;
link_state_event(l_ptr, TRAFFIC_EVT);
l_ptr->stats.recv_states++;
if (link_resetting(l_ptr))
break;
if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg)))
rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt);
if (msg_probe(msg))
l_ptr->stats.recv_probes++;
/* Protocol message before retransmits, reduce loss risk */
if (l_ptr->owner->bclink.recv_permitted)
tipc_bclink_update_link_state(l_ptr->owner,
msg_last_bcast(msg));
if (rec_gap || (msg_probe(msg)))
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0,
rec_gap, 0, 0);
if (msg_seq_gap(msg)) {
l_ptr->stats.recv_nacks++;
tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
msg_seq_gap(msg));
}
if (tipc_link_is_up(l_ptr))
tipc_node_fsm_evt(l_ptr->owner,
PEER_ESTABL_CONTACT_EVT);
break;
}
exit:
kfree_skb(buf);
}
/* tipc_link_build_proto_msg: prepare link protocol message for transmission
*/
static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
......@@ -1727,6 +1566,96 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
return *skb;
}
/* tipc_link_proto_rcv(): receive link level protocol message :
* Note that network plane id propagates through the network, and may
* change at any time. The node with lowest numerical id determines
* network plane
*/
static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr = buf_msg(skb);
u16 rcvgap = 0;
u16 nacked_gap = msg_seq_gap(hdr);
u16 peers_snd_nxt = msg_next_sent(hdr);
u16 peers_tol = msg_link_tolerance(hdr);
u16 peers_prio = msg_linkprio(hdr);
char *if_name;
int rc = 0;
if (l->exec_mode == TIPC_LINK_BLOCKED)
goto exit;
if (link_own_addr(l) > msg_prevnode(hdr))
l->net_plane = msg_net_plane(hdr);
switch (msg_type(hdr)) {
case RESET_MSG:
/* Ignore duplicate RESET with old session number */
if ((less_eq(msg_session(hdr), l->peer_session)) &&
(l->peer_session != WILDCARD_SESSION))
break;
/* fall thru' */
case ACTIVATE_MSG:
/* Complete own link name with peer's interface name */
if_name = strrchr(l->name, ':') + 1;
if (sizeof(l->name) - (if_name - l->name) <= TIPC_MAX_IF_NAME)
break;
if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME)
break;
strncpy(if_name, msg_data(hdr), TIPC_MAX_IF_NAME);
/* Update own tolerance if peer indicates a non-zero value */
if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
l->tolerance = peers_tol;
/* Update own priority if peer's priority is higher */
if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI))
l->priority = peers_prio;
l->peer_session = msg_session(hdr);
l->peer_bearer_id = msg_bearer_id(hdr);
rc = tipc_link_fsm_evt(l, msg_type(hdr), xmitq);
if (l->mtu > msg_max_pkt(hdr))
l->mtu = msg_max_pkt(hdr);
break;
case STATE_MSG:
/* Update own tolerance if peer indicates a non-zero value */
if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
l->tolerance = peers_tol;
l->silent_intv_cnt = 0;
l->stats.recv_states++;
if (msg_probe(hdr))
l->stats.recv_probes++;
rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
if (!tipc_link_is_up(l))
break;
/* Has peer sent packets we haven't received yet ? */
if (more(peers_snd_nxt, l->rcv_nxt))
rcvgap = peers_snd_nxt - l->rcv_nxt;
if (rcvgap || (msg_probe(hdr)))
tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
0, l->mtu, xmitq);
tipc_link_release_pkts(l, msg_ack(hdr));
/* If NACK, retransmit will now start at right position */
if (nacked_gap) {
rc |= tipc_link_retransm(l, nacked_gap, xmitq);
l->stats.recv_nacks++;
}
tipc_link_advance_backlog(l, xmitq);
if (unlikely(!skb_queue_empty(&l->wakeupq)))
link_prepare_wakeup(l);
}
exit:
kfree_skb(skb);
return rc;
}
void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
{
int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
......
......@@ -58,7 +58,7 @@ enum {
TIPC_LINK_TUNNEL
};
/* Events occurring at packet reception or at timeout
/* Events returned from link at packet reception or at timeout
*/
enum {
TIPC_LINK_UP_EVT = 1,
......@@ -223,6 +223,7 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
void tipc_link_purge_backlog(struct tipc_link *l);
void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_activate(struct tipc_link *link);
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list);
int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
......@@ -244,7 +245,8 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
void link_prepare_wakeup(struct tipc_link *l);
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq);
static inline u32 link_own_addr(struct tipc_link *l)
{
return msg_prevnode(l->pmsg);
......
......@@ -38,6 +38,7 @@
#define _TIPC_MSG_H
#include <linux/tipc.h>
#include "core.h"
/*
* Constants and routines used to read and write TIPC payload message headers
......@@ -658,12 +659,12 @@ static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
/*
* Word 5
*/
static inline u32 msg_session(struct tipc_msg *m)
static inline u16 msg_session(struct tipc_msg *m)
{
return msg_bits(m, 5, 16, 0xffff);
}
static inline void msg_set_session(struct tipc_msg *m, u32 n)
static inline void msg_set_session(struct tipc_msg *m, u16 n)
{
msg_set_bits(m, 5, 16, 0xffff, n);
}
......@@ -766,10 +767,19 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n);
}
static inline bool msg_peer_is_up(struct tipc_msg *m)
static inline bool msg_is_traffic(struct tipc_msg *m)
{
if (likely(msg_user(m) != LINK_PROTOCOL) || (msg_type(m) == STATE_MSG))
if (likely(msg_user(m) != LINK_PROTOCOL))
return true;
if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG))
return false;
return true;
}
static inline bool msg_peer_is_up(struct tipc_msg *m)
{
if (likely(msg_is_traffic(m)))
return false;
return msg_redundant_link(m);
}
......@@ -886,4 +896,36 @@ static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
return rv;
}
/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
* @list: list to be appended to
* @skb: buffer to add
* Returns true if queue should treated further, otherwise false
*/
static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list,
struct sk_buff *skb)
{
struct sk_buff *_skb, *tmp;
struct tipc_msg *hdr = buf_msg(skb);
u16 seqno = msg_seqno(hdr);
if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) {
__skb_queue_head(list, skb);
return true;
}
if (likely(less(seqno, buf_seqno(skb_peek(list))))) {
__skb_queue_head(list, skb);
return true;
}
if (!more(seqno, buf_seqno(skb_peek_tail(list)))) {
skb_queue_walk_safe(list, _skb, tmp) {
if (likely(less(seqno, buf_seqno(_skb)))) {
__skb_queue_before(list, _skb, skb);
return true;
}
}
}
__skb_queue_tail(list, skb);
return false;
}
#endif
......@@ -40,11 +40,13 @@
#include "name_distr.h"
#include "socket.h"
#include "bcast.h"
#include "discover.h"
static void node_lost_contact(struct tipc_node *n_ptr);
static void node_established_contact(struct tipc_node *n_ptr);
static void tipc_node_delete(struct tipc_node *node);
static void tipc_node_timeout(unsigned long data);
static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
struct tipc_sock_conn {
u32 port;
......@@ -141,7 +143,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
break;
}
list_add_tail_rcu(&n_ptr->list, &temp_node->list);
n_ptr->state = SELF_DOWN_PEER_DOWN;
n_ptr->state = SELF_DOWN_PEER_LEAVING;
n_ptr->signature = INVALID_NODE_SIG;
n_ptr->active_links[0] = INVALID_BEARER_ID;
n_ptr->active_links[1] = INVALID_BEARER_ID;
......@@ -424,7 +426,7 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
/* tipc_node_fsm_evt - node finite state machine
* Determines when contact is allowed with peer node
*/
void tipc_node_fsm_evt(struct tipc_node *n, int evt)
static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
{
int state = n->state;
......@@ -523,23 +525,36 @@ void tipc_node_fsm_evt(struct tipc_node *n, int evt)
n->state = state;
}
bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr)
bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l,
struct tipc_msg *hdr)
{
int state = n->state;
if (likely(state == SELF_UP_PEER_UP))
return true;
if (state == SELF_DOWN_PEER_DOWN)
return true;
if (state == SELF_UP_PEER_COMING)
if (state == SELF_UP_PEER_COMING) {
/* If not traffic msg, peer may still be ESTABLISHING */
if (tipc_link_is_up(l) && msg_is_traffic(hdr))
tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
return true;
}
if (state == SELF_COMING_PEER_UP)
return true;
if (state == SELF_LEAVING_PEER_DOWN)
return false;
if (state == SELF_DOWN_PEER_LEAVING)
if (!msg_peer_is_up(hdr))
if (state == SELF_DOWN_PEER_LEAVING) {
if (msg_peer_is_up(hdr))
return false;
tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
return true;
}
return false;
}
......@@ -819,6 +834,82 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
return 0;
}
/**
* tipc_rcv - process TIPC packets/messages arriving from off-node
* @net: the applicable net namespace
* @skb: TIPC packet
* @bearer: pointer to bearer message arrived on
*
* Invoked with no locks held. Bearer pointer must point to a valid bearer
* structure (i.e. cannot be NULL), but bearer can be inactive.
*/
void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
{
struct sk_buff_head xmitq;
struct tipc_node *n;
struct tipc_link *l;
struct tipc_msg *hdr;
struct tipc_media_addr *maddr;
int bearer_id = b->identity;
int rc = 0;
__skb_queue_head_init(&xmitq);
/* Ensure message is well-formed */
if (unlikely(!tipc_msg_validate(skb)))
goto discard;
/* Handle arrival of a non-unicast link packet */
hdr = buf_msg(skb);
if (unlikely(msg_non_seq(hdr))) {
if (msg_user(hdr) == LINK_CONFIG)
tipc_disc_rcv(net, skb, b);
else
tipc_bclink_rcv(net, skb);
return;
}
/* Locate neighboring node that sent packet */
n = tipc_node_find(net, msg_prevnode(hdr));
if (unlikely(!n))
goto discard;
tipc_node_lock(n);
/* Locate link endpoint that should handle packet */
l = n->links[bearer_id].link;
if (unlikely(!l))
goto unlock;
/* Is reception of this packet permitted at the moment ? */
if (unlikely(n->state != SELF_UP_PEER_UP))
if (!tipc_node_filter_skb(n, l, hdr))
goto unlock;
if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
tipc_bclink_sync_state(n, hdr);
/* Release acked broadcast messages */
if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
/* Check protocol and update link state */
rc = tipc_link_rcv(l, skb, &xmitq);
if (unlikely(rc & TIPC_LINK_UP_EVT))
tipc_link_activate(l);
if (unlikely(rc & TIPC_LINK_DOWN_EVT))
tipc_link_reset(l);
skb = NULL;
unlock:
tipc_node_unlock(n);
tipc_sk_rcv(net, &n->links[bearer_id].inputq);
maddr = &n->links[bearer_id].maddr;
tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
tipc_node_put(n);
discard:
kfree_skb(skb);
}
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
{
int err;
......
......@@ -185,7 +185,6 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
u32 selector);
int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
static inline void tipc_node_lock(struct tipc_node *node)
......@@ -193,9 +192,6 @@ static inline void tipc_node_lock(struct tipc_node *node)
spin_lock_bh(&node->lock);
}
void tipc_node_fsm_evt(struct tipc_node *n, int evt);
bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr);
static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
{
int bearer_id = n->active_links[sel & 1];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment