Commit 04d7b574 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: add multipoint-to-point flow control

We already have point-to-multipoint flow control within a group. But
we even need the opposite; -a scheme which can handle that potentially
hundreds of sources may try to send messages to the same destination
simultaneously without causing buffer overflow at the recipient. This
commit adds such a mechanism.

The algorithm works as follows:

- When a member detects a new, joining member, it initially set its
  state to JOINED and advertises a minimum window to the new member.
  This window is chosen so that the new member can send exactly one
  maximum sized message, or several smaller ones, to the recipient
  before it must stop and wait for an additional advertisement. This
  minimum window ADV_IDLE is set to 65 1kB blocks.

- When a member receives the first data message from a JOINED member,
  it changes the state of the latter to ACTIVE, and advertises a larger
  window ADV_ACTIVE = 12 x ADV_IDLE blocks to the sender, so it can
  continue sending with minimal disturbances to the data flow.

- The active members are kept in a dedicated linked list. Each time a
  message is received from an active member, it will be moved to the
  tail of that list. This way, we keep a record of which members have
  been most (tail) and least (head) recently active.

- There is a maximum number (16) of permitted simultaneous active
  senders per receiver. When this limit is reached, the receiver will
  not advertise anything immediately to a new sender, but instead put
  it in a PENDING state, and add it to a corresponding queue. At the
  same time, it will pick the least recently active member, send it an
  advertisement RECLAIM message, and set this member to state
  RECLAIMING.

- The reclaimee member has to respond with a REMIT message, meaning that
  it goes back to a send window of ADV_IDLE, and returns its unused
  advertised blocks beyond that value to the reclaiming member.

- When the reclaiming member receives the REMIT message, it unlinks
  the reclaimee from its active list, resets its state to JOINED, and
  notes that it is now back at ADV_IDLE advertised blocks to that
  member. If there are still unread data messages sent out by
  reclaimee before the REMIT, the member goes into an intermediate
  state REMITTED, where it stays until the said messages have been
  consumed.

- The returned advertised blocks can now be re-advertised to the
  pending member, which is now set to state ACTIVE and added to
  the active member list.

- To be proactive, i.e., to minimize the risk that any member will
  end up in the pending queue, we start reclaiming resources already
  when the number of active members exceeds 3/4 of the permitted
  maximum.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent a3bada70
...@@ -54,6 +54,10 @@ enum mbr_state { ...@@ -54,6 +54,10 @@ enum mbr_state {
MBR_JOINING, MBR_JOINING,
MBR_PUBLISHED, MBR_PUBLISHED,
MBR_JOINED, MBR_JOINED,
MBR_PENDING,
MBR_ACTIVE,
MBR_RECLAIMING,
MBR_REMITTED,
MBR_LEAVING MBR_LEAVING
}; };
...@@ -79,6 +83,9 @@ struct tipc_member { ...@@ -79,6 +83,9 @@ struct tipc_member {
struct tipc_group { struct tipc_group {
struct rb_root members; struct rb_root members;
struct list_head congested; struct list_head congested;
struct list_head pending;
struct list_head active;
struct list_head reclaiming;
struct tipc_nlist dests; struct tipc_nlist dests;
struct net *net; struct net *net;
int subid; int subid;
...@@ -88,6 +95,8 @@ struct tipc_group { ...@@ -88,6 +95,8 @@ struct tipc_group {
u32 scope; u32 scope;
u32 portid; u32 portid;
u16 member_cnt; u16 member_cnt;
u16 active_cnt;
u16 max_active;
u16 bc_snd_nxt; u16 bc_snd_nxt;
u16 bc_ackers; u16 bc_ackers;
bool loopback; bool loopback;
...@@ -97,12 +106,29 @@ struct tipc_group { ...@@ -97,12 +106,29 @@ struct tipc_group {
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
int mtyp, struct sk_buff_head *xmitq); int mtyp, struct sk_buff_head *xmitq);
static void tipc_group_decr_active(struct tipc_group *grp,
struct tipc_member *m)
{
if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING)
grp->active_cnt--;
}
static int tipc_group_rcvbuf_limit(struct tipc_group *grp) static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
{ {
int max_active, active_pool, idle_pool;
int mcnt = grp->member_cnt + 1; int mcnt = grp->member_cnt + 1;
/* Limit simultaneous reception from other members */
max_active = min(mcnt / 8, 64);
max_active = max(max_active, 16);
grp->max_active = max_active;
/* Reserve blocks for active and idle members */
active_pool = max_active * ADV_ACTIVE;
idle_pool = (mcnt - max_active) * ADV_IDLE;
/* Scale to bytes, considering worst-case truesize/msgsize ratio */ /* Scale to bytes, considering worst-case truesize/msgsize ratio */
return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4; return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4;
} }
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
...@@ -143,6 +169,9 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, ...@@ -143,6 +169,9 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
return NULL; return NULL;
tipc_nlist_init(&grp->dests, tipc_own_addr(net)); tipc_nlist_init(&grp->dests, tipc_own_addr(net));
INIT_LIST_HEAD(&grp->congested); INIT_LIST_HEAD(&grp->congested);
INIT_LIST_HEAD(&grp->active);
INIT_LIST_HEAD(&grp->pending);
INIT_LIST_HEAD(&grp->reclaiming);
grp->members = RB_ROOT; grp->members = RB_ROOT;
grp->net = net; grp->net = net;
grp->portid = portid; grp->portid = portid;
...@@ -286,6 +315,7 @@ static void tipc_group_delete_member(struct tipc_group *grp, ...@@ -286,6 +315,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
list_del_init(&m->list); list_del_init(&m->list);
list_del_init(&m->congested); list_del_init(&m->congested);
tipc_group_decr_active(grp, m);
/* If last member on a node, remove node from dest list */ /* If last member on a node, remove node from dest list */
if (!tipc_group_find_node(grp, m->node)) if (!tipc_group_find_node(grp, m->node))
...@@ -378,6 +408,10 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, ...@@ -378,6 +408,10 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
return true; return true;
if (state == MBR_JOINED && adv == ADV_IDLE) if (state == MBR_JOINED && adv == ADV_IDLE)
return true; return true;
if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
return true;
if (state == MBR_PENDING && adv == ADV_IDLE)
return true;
skb_queue_head_init(&xmitq); skb_queue_head_init(&xmitq);
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
tipc_node_distr_xmit(grp->net, &xmitq); tipc_node_distr_xmit(grp->net, &xmitq);
...@@ -523,7 +557,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, ...@@ -523,7 +557,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
u32 port, struct sk_buff_head *xmitq) u32 port, struct sk_buff_head *xmitq)
{ {
struct tipc_member *m; struct list_head *active = &grp->active;
int max_active = grp->max_active;
int reclaim_limit = max_active * 3 / 4;
int active_cnt = grp->active_cnt;
struct tipc_member *m, *rm;
m = tipc_group_find_member(grp, node, port); m = tipc_group_find_member(grp, node, port);
if (!m) if (!m)
...@@ -533,9 +571,41 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, ...@@ -533,9 +571,41 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
switch (m->state) { switch (m->state) {
case MBR_JOINED: case MBR_JOINED:
if (m->advertised <= (ADV_ACTIVE - ADV_UNIT)) /* Reclaim advertised space from least active member */
if (!list_empty(active) && active_cnt >= reclaim_limit) {
rm = list_first_entry(active, struct tipc_member, list);
rm->state = MBR_RECLAIMING;
list_move_tail(&rm->list, &grp->reclaiming);
tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
}
/* If max active, become pending and wait for reclaimed space */
if (active_cnt >= max_active) {
m->state = MBR_PENDING;
list_add_tail(&m->list, &grp->pending);
break;
}
/* Otherwise become active */
m->state = MBR_ACTIVE;
list_add_tail(&m->list, &grp->active);
grp->active_cnt++;
/* Fall through */
case MBR_ACTIVE:
if (!list_is_last(&m->list, &grp->active))
list_move_tail(&m->list, &grp->active);
if (m->advertised > (ADV_ACTIVE * 3 / 4))
break;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
break;
case MBR_REMITTED:
if (m->advertised > ADV_IDLE)
break;
m->state = MBR_JOINED;
if (m->advertised < ADV_IDLE) {
pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
}
break; break;
case MBR_RECLAIMING:
case MBR_DISCOVERED: case MBR_DISCOVERED:
case MBR_JOINING: case MBR_JOINING:
case MBR_LEAVING: case MBR_LEAVING:
...@@ -557,8 +627,10 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, ...@@ -557,8 +627,10 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
if (!skb) if (!skb)
return; return;
if (m->state == MBR_JOINED) if (m->state == MBR_ACTIVE)
adv = ADV_ACTIVE - m->advertised; adv = ADV_ACTIVE - m->advertised;
else if (m->state == MBR_JOINED || m->state == MBR_PENDING)
adv = ADV_IDLE - m->advertised;
hdr = buf_msg(skb); hdr = buf_msg(skb);
...@@ -573,6 +645,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, ...@@ -573,6 +645,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
m->advertised += adv; m->advertised += adv;
} else if (mtyp == GRP_ACK_MSG) { } else if (mtyp == GRP_ACK_MSG) {
msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
} else if (mtyp == GRP_REMIT_MSG) {
msg_set_grp_remitted(hdr, m->window);
} }
__skb_queue_tail(xmitq, skb); __skb_queue_tail(xmitq, skb);
} }
...@@ -583,8 +657,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -583,8 +657,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
{ {
u32 node = msg_orignode(hdr); u32 node = msg_orignode(hdr);
u32 port = msg_origport(hdr); u32 port = msg_origport(hdr);
struct tipc_member *m; struct tipc_member *m, *pm;
struct tipc_msg *ehdr; struct tipc_msg *ehdr;
u16 remitted, in_flight;
if (!grp) if (!grp)
return; return;
...@@ -626,6 +701,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -626,6 +701,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
/* Wait until WITHDRAW event is received */ /* Wait until WITHDRAW event is received */
if (m->state != MBR_LEAVING) { if (m->state != MBR_LEAVING) {
tipc_group_decr_active(grp, m);
m->state = MBR_LEAVING; m->state = MBR_LEAVING;
return; return;
} }
...@@ -653,6 +729,48 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, ...@@ -653,6 +729,48 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
*usr_wakeup = true; *usr_wakeup = true;
m->usr_pending = false; m->usr_pending = false;
return; return;
case GRP_RECLAIM_MSG:
if (!m)
return;
*usr_wakeup = m->usr_pending;
m->usr_pending = false;
tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
m->window = ADV_IDLE;
return;
case GRP_REMIT_MSG:
if (!m || m->state != MBR_RECLAIMING)
return;
list_del_init(&m->list);
grp->active_cnt--;
remitted = msg_grp_remitted(hdr);
/* Messages preceding the REMIT still in receive queue */
if (m->advertised > remitted) {
m->state = MBR_REMITTED;
in_flight = m->advertised - remitted;
}
/* All messages preceding the REMIT have been read */
if (m->advertised <= remitted) {
m->state = MBR_JOINED;
in_flight = 0;
}
/* ..and the REMIT overtaken by more messages => re-advertise */
if (m->advertised < remitted)
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
m->advertised = ADV_IDLE + in_flight;
/* Set oldest pending member to active and advertise */
if (list_empty(&grp->pending))
return;
pm = list_first_entry(&grp->pending, struct tipc_member, list);
pm->state = MBR_ACTIVE;
list_move_tail(&pm->list, &grp->active);
grp->active_cnt++;
if (pm->advertised <= (ADV_ACTIVE * 3 / 4))
tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
return;
default: default:
pr_warn("Received unknown GROUP_PROTO message\n"); pr_warn("Received unknown GROUP_PROTO message\n");
} }
...@@ -735,6 +853,7 @@ void tipc_group_member_evt(struct tipc_group *grp, ...@@ -735,6 +853,7 @@ void tipc_group_member_evt(struct tipc_group *grp,
/* Hold back event if more messages might be expected */ /* Hold back event if more messages might be expected */
if (m->state != MBR_LEAVING && node_up) { if (m->state != MBR_LEAVING && node_up) {
m->event_msg = skb; m->event_msg = skb;
tipc_group_decr_active(grp, m);
m->state = MBR_LEAVING; m->state = MBR_LEAVING;
} else { } else {
if (node_up) if (node_up)
......
...@@ -548,6 +548,8 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) ...@@ -548,6 +548,8 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n)
#define GRP_LEAVE_MSG 1 #define GRP_LEAVE_MSG 1
#define GRP_ADV_MSG 2 #define GRP_ADV_MSG 2
#define GRP_ACK_MSG 3 #define GRP_ACK_MSG 3
#define GRP_RECLAIM_MSG 4
#define GRP_REMIT_MSG 5
/* /*
* Word 1 * Word 1
...@@ -850,6 +852,16 @@ static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n) ...@@ -850,6 +852,16 @@ static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n)
msg_set_bits(m, 9, 16, 0xffff, n); msg_set_bits(m, 9, 16, 0xffff, n);
} }
static inline u16 msg_grp_remitted(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
}
static inline void msg_set_grp_remitted(struct tipc_msg *m, u16 n)
{
msg_set_bits(m, 9, 16, 0xffff, n);
}
/* Word 10 /* Word 10
*/ */
static inline u16 msg_grp_evt(struct tipc_msg *m) static inline u16 msg_grp_evt(struct tipc_msg *m)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment