Commit 555da9af authored by Karsten Graul's avatar Karsten Graul Committed by David S. Miller

net/smc: add event-based llc_flow framework

The new framework allows to start specific types of LLC control flows,
protects active flows and makes it possible to wait for flows to finish
before starting a new flow.
This mechanism is used for the LLC control layer to model flows like
'add link' or 'delete link' which need to send/receive several LLC
messages and are not allowed to get interrupted by the wrong type of
messages.
'Add link' or 'Delete link' messages arriving in the middle of a flow
are delayed and processed when the current flow finished.
Signed-off-by: default avatarKarsten Graul <kgraul@linux.ibm.com>
Reviewed-by: default avatarUrsula Braun <ubraun@linux.ibm.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 1569a3c4
...@@ -263,6 +263,7 @@ static void smc_lgr_free_work(struct work_struct *work) ...@@ -263,6 +263,7 @@ static void smc_lgr_free_work(struct work_struct *work)
if (smc_link_usable(lnk)) if (smc_link_usable(lnk))
lnk->state = SMC_LNK_INACTIVE; lnk->state = SMC_LNK_INACTIVE;
} }
wake_up_interruptible_all(&lgr->llc_waiter);
} }
smc_lgr_free(lgr); smc_lgr_free(lgr);
} }
...@@ -696,6 +697,7 @@ static void smc_lgr_cleanup(struct smc_link_group *lgr) ...@@ -696,6 +697,7 @@ static void smc_lgr_cleanup(struct smc_link_group *lgr)
if (smc_link_usable(lnk)) if (smc_link_usable(lnk))
lnk->state = SMC_LNK_INACTIVE; lnk->state = SMC_LNK_INACTIVE;
} }
wake_up_interruptible_all(&lgr->llc_waiter);
} }
} }
......
...@@ -197,6 +197,20 @@ struct smc_rtoken { /* address/key of remote RMB */ ...@@ -197,6 +197,20 @@ struct smc_rtoken { /* address/key of remote RMB */
struct smcd_dev; struct smcd_dev;
enum smc_llc_flowtype {
SMC_LLC_FLOW_NONE = 0,
SMC_LLC_FLOW_ADD_LINK = 2,
SMC_LLC_FLOW_DEL_LINK = 4,
SMC_LLC_FLOW_RKEY = 6,
};
struct smc_llc_qentry;
struct smc_llc_flow {
enum smc_llc_flowtype type;
struct smc_llc_qentry *qentry;
};
struct smc_link_group { struct smc_link_group {
struct list_head list; struct list_head list;
struct rb_root conns_all; /* connection tree */ struct rb_root conns_all; /* connection tree */
...@@ -238,6 +252,16 @@ struct smc_link_group { ...@@ -238,6 +252,16 @@ struct smc_link_group {
/* protects llc_event_q */ /* protects llc_event_q */
struct work_struct llc_event_work; struct work_struct llc_event_work;
/* llc event worker */ /* llc event worker */
wait_queue_head_t llc_waiter;
/* w4 next llc event */
struct smc_llc_flow llc_flow_lcl;
/* llc local control field */
struct smc_llc_flow llc_flow_rmt;
/* llc remote control field */
struct smc_llc_qentry *delayed_event;
/* arrived when flow active */
spinlock_t llc_flow_lock;
/* protects llc flow */
int llc_testlink_time; int llc_testlink_time;
/* link keep alive time */ /* link keep alive time */
}; };
......
...@@ -140,6 +140,154 @@ struct smc_llc_qentry { ...@@ -140,6 +140,154 @@ struct smc_llc_qentry {
union smc_llc_msg msg; union smc_llc_msg msg;
}; };
struct smc_llc_qentry *smc_llc_flow_qentry_clr(struct smc_llc_flow *flow)
{
struct smc_llc_qentry *qentry = flow->qentry;
flow->qentry = NULL;
return qentry;
}
void smc_llc_flow_qentry_del(struct smc_llc_flow *flow)
{
struct smc_llc_qentry *qentry;
if (flow->qentry) {
qentry = flow->qentry;
flow->qentry = NULL;
kfree(qentry);
}
}
static inline void smc_llc_flow_qentry_set(struct smc_llc_flow *flow,
struct smc_llc_qentry *qentry)
{
flow->qentry = qentry;
}
/* try to start a new llc flow, initiated by an incoming llc msg */
static bool smc_llc_flow_start(struct smc_llc_flow *flow,
struct smc_llc_qentry *qentry)
{
struct smc_link_group *lgr = qentry->link->lgr;
spin_lock_bh(&lgr->llc_flow_lock);
if (flow->type) {
/* a flow is already active */
if ((qentry->msg.raw.hdr.common.type == SMC_LLC_ADD_LINK ||
qentry->msg.raw.hdr.common.type == SMC_LLC_DELETE_LINK) &&
!lgr->delayed_event) {
lgr->delayed_event = qentry;
} else {
/* forget this llc request */
kfree(qentry);
}
spin_unlock_bh(&lgr->llc_flow_lock);
return false;
}
switch (qentry->msg.raw.hdr.common.type) {
case SMC_LLC_ADD_LINK:
flow->type = SMC_LLC_FLOW_ADD_LINK;
break;
case SMC_LLC_DELETE_LINK:
flow->type = SMC_LLC_FLOW_DEL_LINK;
break;
case SMC_LLC_CONFIRM_RKEY:
case SMC_LLC_DELETE_RKEY:
flow->type = SMC_LLC_FLOW_RKEY;
break;
default:
flow->type = SMC_LLC_FLOW_NONE;
}
if (qentry == lgr->delayed_event)
lgr->delayed_event = NULL;
spin_unlock_bh(&lgr->llc_flow_lock);
smc_llc_flow_qentry_set(flow, qentry);
return true;
}
/* start a new local llc flow, wait till current flow finished */
int smc_llc_flow_initiate(struct smc_link_group *lgr,
enum smc_llc_flowtype type)
{
enum smc_llc_flowtype allowed_remote = SMC_LLC_FLOW_NONE;
int rc;
/* all flows except confirm_rkey and delete_rkey are exclusive,
* confirm/delete rkey flows can run concurrently (local and remote)
*/
if (type == SMC_LLC_FLOW_RKEY)
allowed_remote = SMC_LLC_FLOW_RKEY;
again:
if (list_empty(&lgr->list))
return -ENODEV;
spin_lock_bh(&lgr->llc_flow_lock);
if (lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE &&
(lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE ||
lgr->llc_flow_rmt.type == allowed_remote)) {
lgr->llc_flow_lcl.type = type;
spin_unlock_bh(&lgr->llc_flow_lock);
return 0;
}
spin_unlock_bh(&lgr->llc_flow_lock);
rc = wait_event_interruptible_timeout(lgr->llc_waiter,
(lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE &&
(lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE ||
lgr->llc_flow_rmt.type == allowed_remote)),
SMC_LLC_WAIT_TIME);
if (!rc)
return -ETIMEDOUT;
goto again;
}
/* finish the current llc flow */
void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow)
{
spin_lock_bh(&lgr->llc_flow_lock);
memset(flow, 0, sizeof(*flow));
flow->type = SMC_LLC_FLOW_NONE;
spin_unlock_bh(&lgr->llc_flow_lock);
if (!list_empty(&lgr->list) && lgr->delayed_event &&
flow == &lgr->llc_flow_lcl)
schedule_work(&lgr->llc_event_work);
else
wake_up_interruptible(&lgr->llc_waiter);
}
/* lnk is optional and used for early wakeup when link goes down, useful in
* cases where we wait for a response on the link after we sent a request
*/
struct smc_llc_qentry *smc_llc_wait(struct smc_link_group *lgr,
struct smc_link *lnk,
int time_out, u8 exp_msg)
{
struct smc_llc_flow *flow = &lgr->llc_flow_lcl;
wait_event_interruptible_timeout(lgr->llc_waiter,
(flow->qentry ||
(lnk && !smc_link_usable(lnk)) ||
list_empty(&lgr->list)),
time_out);
if (!flow->qentry ||
(lnk && !smc_link_usable(lnk)) || list_empty(&lgr->list)) {
smc_llc_flow_qentry_del(flow);
goto out;
}
if (exp_msg && flow->qentry->msg.raw.hdr.common.type != exp_msg) {
if (exp_msg == SMC_LLC_ADD_LINK &&
flow->qentry->msg.raw.hdr.common.type ==
SMC_LLC_DELETE_LINK) {
/* flow_start will delay the unexpected msg */
smc_llc_flow_start(&lgr->llc_flow_lcl,
smc_llc_flow_qentry_clr(flow));
return NULL;
}
smc_llc_flow_qentry_del(flow);
}
out:
return flow->qentry;
}
/********************************** send *************************************/ /********************************** send *************************************/
struct smc_llc_tx_pend { struct smc_llc_tx_pend {
...@@ -547,6 +695,16 @@ static void smc_llc_event_work(struct work_struct *work) ...@@ -547,6 +695,16 @@ static void smc_llc_event_work(struct work_struct *work)
llc_event_work); llc_event_work);
struct smc_llc_qentry *qentry; struct smc_llc_qentry *qentry;
if (!lgr->llc_flow_lcl.type && lgr->delayed_event) {
if (smc_link_usable(lgr->delayed_event->link)) {
smc_llc_event_handler(lgr->delayed_event);
} else {
qentry = lgr->delayed_event;
lgr->delayed_event = NULL;
kfree(qentry);
}
}
again: again:
spin_lock_bh(&lgr->llc_event_q_lock); spin_lock_bh(&lgr->llc_event_q_lock);
if (!list_empty(&lgr->llc_event_q)) { if (!list_empty(&lgr->llc_event_q)) {
...@@ -676,6 +834,8 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc) ...@@ -676,6 +834,8 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc)
INIT_WORK(&lgr->llc_event_work, smc_llc_event_work); INIT_WORK(&lgr->llc_event_work, smc_llc_event_work);
INIT_LIST_HEAD(&lgr->llc_event_q); INIT_LIST_HEAD(&lgr->llc_event_q);
spin_lock_init(&lgr->llc_event_q_lock); spin_lock_init(&lgr->llc_event_q_lock);
spin_lock_init(&lgr->llc_flow_lock);
init_waitqueue_head(&lgr->llc_waiter);
lgr->llc_testlink_time = net->ipv4.sysctl_tcp_keepalive_time; lgr->llc_testlink_time = net->ipv4.sysctl_tcp_keepalive_time;
} }
...@@ -683,7 +843,12 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc) ...@@ -683,7 +843,12 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc)
void smc_llc_lgr_clear(struct smc_link_group *lgr) void smc_llc_lgr_clear(struct smc_link_group *lgr)
{ {
smc_llc_event_flush(lgr); smc_llc_event_flush(lgr);
wake_up_interruptible_all(&lgr->llc_waiter);
cancel_work_sync(&lgr->llc_event_work); cancel_work_sync(&lgr->llc_event_work);
if (lgr->delayed_event) {
kfree(lgr->delayed_event);
lgr->delayed_event = NULL;
}
} }
int smc_llc_link_init(struct smc_link *link) int smc_llc_link_init(struct smc_link *link)
......
...@@ -63,6 +63,14 @@ int smc_llc_do_confirm_rkey(struct smc_link *link, ...@@ -63,6 +63,14 @@ int smc_llc_do_confirm_rkey(struct smc_link *link,
struct smc_buf_desc *rmb_desc); struct smc_buf_desc *rmb_desc);
int smc_llc_do_delete_rkey(struct smc_link *link, int smc_llc_do_delete_rkey(struct smc_link *link,
struct smc_buf_desc *rmb_desc); struct smc_buf_desc *rmb_desc);
int smc_llc_flow_initiate(struct smc_link_group *lgr,
enum smc_llc_flowtype type);
void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow);
struct smc_llc_qentry *smc_llc_wait(struct smc_link_group *lgr,
struct smc_link *lnk,
int time_out, u8 exp_msg);
struct smc_llc_qentry *smc_llc_flow_qentry_clr(struct smc_llc_flow *flow);
void smc_llc_flow_qentry_del(struct smc_llc_flow *flow);
int smc_llc_init(void) __init; int smc_llc_init(void) __init;
#endif /* SMC_LLC_H */ #endif /* SMC_LLC_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment