Commit 58dc55f2 authored by Ying Xue's avatar Ying Xue Committed by David S. Miller

tipc: use generic SKB list APIs to manage link transmission queue

Use standard SKB list APIs associated with struct sk_buff_head to
manage link transmission queue, having relevant code more clean.
Signed-off-by: default avatarYing Xue <ying.xue@windriver.com>
Reviewed-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 58d78b32
......@@ -217,12 +217,13 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
*/
static void bclink_retransmit_pkt(u32 after, u32 to)
{
struct sk_buff *buf;
struct sk_buff *skb;
buf = bcl->first_out;
while (buf && less_eq(buf_seqno(buf), after))
buf = buf->next;
tipc_link_retransmit(bcl, buf, mod(to - after));
skb_queue_walk(&bcl->outqueue, skb) {
if (more(buf_seqno(skb), after))
break;
}
tipc_link_retransmit(bcl, skb, mod(to - after));
}
/**
......@@ -245,14 +246,14 @@ void tipc_bclink_wakeup_users(void)
*/
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
struct sk_buff *crs;
struct sk_buff *skb, *tmp;
struct sk_buff *next;
unsigned int released = 0;
tipc_bclink_lock();
/* Bail out if tx queue is empty (no clean up is required) */
crs = bcl->first_out;
if (!crs)
skb = skb_peek(&bcl->outqueue);
if (!skb)
goto exit;
/* Determine which messages need to be acknowledged */
......@@ -271,41 +272,41 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
* Bail out if specified sequence number does not correspond
* to a message that has been sent and not yet acknowledged
*/
if (less(acked, buf_seqno(crs)) ||
if (less(acked, buf_seqno(skb)) ||
less(bcl->fsm_msg_cnt, acked) ||
less_eq(acked, n_ptr->bclink.acked))
goto exit;
}
/* Skip over packets that node has previously acknowledged */
while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked))
crs = crs->next;
skb_queue_walk(&bcl->outqueue, skb) {
if (more(buf_seqno(skb), n_ptr->bclink.acked))
break;
}
/* Update packets that node is now acknowledging */
skb_queue_walk_from_safe(&bcl->outqueue, skb, tmp) {
if (more(buf_seqno(skb), acked))
break;
while (crs && less_eq(buf_seqno(crs), acked)) {
next = crs->next;
if (crs != bcl->next_out)
bcbuf_decr_acks(crs);
else {
bcbuf_set_acks(crs, 0);
next = tipc_skb_queue_next(&bcl->outqueue, skb);
if (skb != bcl->next_out) {
bcbuf_decr_acks(skb);
} else {
bcbuf_set_acks(skb, 0);
bcl->next_out = next;
bclink_set_last_sent();
}
if (bcbuf_acks(crs) == 0) {
bcl->first_out = next;
bcl->out_queue_size--;
kfree_skb(crs);
if (bcbuf_acks(skb) == 0) {
__skb_unlink(skb, &bcl->outqueue);
kfree_skb(skb);
released = 1;
}
crs = next;
}
n_ptr->bclink.acked = acked;
/* Try resolving broadcast link congestion, if necessary */
if (unlikely(bcl->next_out)) {
tipc_link_push_packets(bcl);
bclink_set_last_sent();
......@@ -327,19 +328,16 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
struct sk_buff *buf;
/* Ignore "stale" link state info */
if (less_eq(last_sent, n_ptr->bclink.last_in))
return;
/* Update link synchronization state; quit if in sync */
bclink_update_last_sent(n_ptr, last_sent);
if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
return;
/* Update out-of-sync state; quit if loss is still unconfirmed */
if ((++n_ptr->bclink.oos_state) == 1) {
if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
return;
......@@ -347,12 +345,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
}
/* Don't NACK if one has been recently sent (or seen) */
if (n_ptr->bclink.oos_state & 0x1)
return;
/* Send NACK */
buf = tipc_buf_acquire(INT_H_SIZE);
if (buf) {
struct tipc_msg *msg = buf_msg(buf);
......@@ -425,9 +421,11 @@ int tipc_bclink_xmit(struct sk_buff *buf)
if (likely(bclink->bcast_nodes.count)) {
rc = __tipc_link_xmit(bcl, buf);
if (likely(!rc)) {
u32 len = skb_queue_len(&bcl->outqueue);
bclink_set_last_sent();
bcl->stats.queue_sz_counts++;
bcl->stats.accu_queue_sz += bcl->out_queue_size;
bcl->stats.accu_queue_sz += len;
}
bc = 1;
}
......@@ -462,7 +460,6 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
* Unicast an ACK periodically, ensuring that
* all nodes in the cluster don't ACK at the same time
*/
if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
tipc_link_proto_xmit(node->active_links[node->addr & 1],
STATE_MSG, 0, 0, 0, 0, 0);
......@@ -484,7 +481,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
int deferred = 0;
/* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tipc_net_id)
goto exit;
......@@ -497,7 +493,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
goto unlock;
/* Handle broadcast protocol message */
if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
if (msg_type(msg) != STATE_MSG)
goto unlock;
......@@ -518,14 +513,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
}
/* Handle in-sequence broadcast message */
seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1);
if (likely(seqno == next_in)) {
receive:
/* Deliver message to destination */
if (likely(msg_isdata(msg))) {
tipc_bclink_lock();
bclink_accept_pkt(node, seqno);
......@@ -574,7 +567,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
buf = NULL;
/* Determine new synchronization state */
tipc_node_lock(node);
if (unlikely(!tipc_node_is_up(node)))
goto unlock;
......@@ -594,7 +586,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
goto unlock;
/* Take in-sequence message from deferred queue & deliver it */
buf = node->bclink.deferred_head;
node->bclink.deferred_head = buf->next;
buf->next = NULL;
......@@ -603,7 +594,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
}
/* Handle out-of-sequence broadcast message */
if (less(next_in, seqno)) {
deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
&node->bclink.deferred_tail,
......@@ -963,6 +953,7 @@ int tipc_bclink_init(void)
sprintf(bcbearer->media.name, "tipc-broadcast");
spin_lock_init(&bclink->lock);
__skb_queue_head_init(&bcl->outqueue);
__skb_queue_head_init(&bcl->waiting_sks);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
......
This diff is collapsed.
......@@ -119,9 +119,7 @@ struct tipc_stats {
* @max_pkt: current maximum packet size for this link
* @max_pkt_target: desired maximum packet size for this link
* @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target)
* @out_queue_size: # of messages in outbound message queue
* @first_out: ptr to first outbound message in queue
* @last_out: ptr to last outbound message in queue
* @outqueue: outbound message queue
* @next_out_no: next sequence number to use for outbound messages
* @last_retransmitted: sequence number of most recently retransmitted message
* @stale_count: # of identical retransmit requests made by peer
......@@ -173,9 +171,7 @@ struct tipc_link {
u32 max_pkt_probes;
/* Sending */
u32 out_queue_size;
struct sk_buff *first_out;
struct sk_buff *last_out;
struct sk_buff_head outqueue;
u32 next_out_no;
u32 last_retransmitted;
u32 stale_count;
......@@ -233,6 +229,8 @@ u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
void tipc_link_retransmit(struct tipc_link *l_ptr,
struct sk_buff *start, u32 retransmits);
struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
const struct sk_buff *skb);
int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
......@@ -258,6 +256,11 @@ static inline int less_eq(u32 left, u32 right)
return mod(right - left) < 32768u;
}
static inline int more(u32 left, u32 right)
{
return !less_eq(left, right);
}
static inline int less(u32 left, u32 right)
{
return less_eq(left, right) && (mod(right) != mod(left));
......@@ -294,7 +297,7 @@ static inline int link_reset_reset(struct tipc_link *l_ptr)
static inline int link_congested(struct tipc_link *l_ptr)
{
return l_ptr->out_queue_size >= l_ptr->queue_limit[0];
return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0];
}
#endif
......@@ -265,16 +265,17 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
/**
* tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
* @bbuf: the existing buffer ("bundle")
* @buf: buffer to be appended
* @list: the buffer chain of the existing buffer ("bundle")
* @skb: buffer to be appended
* @mtu: max allowable size for the bundle buffer
* Consumes buffer if successful
* Returns true if bundling could be performed, otherwise false
*/
bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
{
struct tipc_msg *bmsg = buf_msg(bbuf);
struct tipc_msg *msg = buf_msg(buf);
struct sk_buff *bskb = skb_peek_tail(list);
struct tipc_msg *bmsg = buf_msg(bskb);
struct tipc_msg *msg = buf_msg(skb);
unsigned int bsz = msg_size(bmsg);
unsigned int msz = msg_size(msg);
u32 start = align(bsz);
......@@ -289,35 +290,36 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
return false;
if (likely(msg_user(bmsg) != MSG_BUNDLER))
return false;
if (likely(!TIPC_SKB_CB(bbuf)->bundling))
if (likely(!TIPC_SKB_CB(bskb)->bundling))
return false;
if (unlikely(skb_tailroom(bbuf) < (pad + msz)))
if (unlikely(skb_tailroom(bskb) < (pad + msz)))
return false;
if (unlikely(max < (start + msz)))
return false;
skb_put(bbuf, pad + msz);
skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz);
skb_put(bskb, pad + msz);
skb_copy_to_linear_data_offset(bskb, start, skb->data, msz);
msg_set_size(bmsg, start + msz);
msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
bbuf->next = buf->next;
kfree_skb(buf);
kfree_skb(skb);
return true;
}
/**
* tipc_msg_make_bundle(): Create bundle buf and append message to its tail
* @buf: buffer to be appended and replaced
* @list: the buffer chain
* @skb: buffer to be appended and replaced
* @mtu: max allowable size for the bundle buffer, inclusive header
* @dnode: destination node for message. (Not always present in header)
* Replaces buffer if successful
* Returns true if success, otherwise false
*/
bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
u32 mtu, u32 dnode)
{
struct sk_buff *bbuf;
struct sk_buff *bskb;
struct tipc_msg *bmsg;
struct tipc_msg *msg = buf_msg(*buf);
struct tipc_msg *msg = buf_msg(skb);
u32 msz = msg_size(msg);
u32 max = mtu - INT_H_SIZE;
......@@ -330,21 +332,19 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
if (msz > (max / 2))
return false;
bbuf = tipc_buf_acquire(max);
if (!bbuf)
bskb = tipc_buf_acquire(max);
if (!bskb)
return false;
skb_trim(bbuf, INT_H_SIZE);
bmsg = buf_msg(bbuf);
skb_trim(bskb, INT_H_SIZE);
bmsg = buf_msg(bskb);
tipc_msg_init(bmsg, MSG_BUNDLER, 0, INT_H_SIZE, dnode);
msg_set_seqno(bmsg, msg_seqno(msg));
msg_set_ack(bmsg, msg_ack(msg));
msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
bbuf->next = (*buf)->next;
TIPC_SKB_CB(bbuf)->bundling = true;
tipc_msg_bundle(bbuf, *buf, mtu);
*buf = bbuf;
return true;
TIPC_SKB_CB(bskb)->bundling = true;
__skb_queue_tail(list, bskb);
return tipc_msg_bundle(list, skb, mtu);
}
/**
......
......@@ -734,9 +734,10 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
u32 mtu, u32 dnode);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu , struct sk_buff **chain);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment