Commit fd8aa909 authored by Juergen Gross's avatar Juergen Gross Committed by Boris Ostrovsky

xen: optimize xenbus driver for multiple concurrent xenstore accesses

Handling of multiple concurrent Xenstore accesses through xenbus driver
either from the kernel or user land is rather lame today: xenbus is
capable to have one access active only at one point of time.

Rewrite xenbus to handle multiple requests concurrently by making use
of the request id of the Xenstore protocol. This requires to:

- Instead of blocking inside xb_read() when trying to read data from
  the xenstore ring buffer do so only in the main loop of
  xenbus_thread().

- Instead of doing writes to the xenstore ring buffer in the context of
  the caller just queue the request and do the write in the dedicated
  xenbus thread.

- Instead of just forwarding the request id specified by the caller of
  xenbus to xenstore use a xenbus internal unique request id. This will
  allow multiple outstanding requests.

- Modify the locking scheme in order to allow multiple requests being
  active in parallel.

- Instead of waiting for the reply of a user's xenstore request after
  writing the request to the xenstore ring buffer return directly to
  the caller and do the waiting in the read path.

Additionally signal handling was optimized by avoiding waking up the
xenbus thread or sending an event to Xenstore in case the addressed
entity is known to be running already.

As a result communication with Xenstore is sped up by a factor of up
to 5: depending on the request type (read or write) and the amount of
data transferred the gain was at least 20% (small reads) and went up to
a factor of 5 for large writes.

In the end some more rough edges of xenbus have been smoothed:

- Handling of memory shortage when reading from xenstore ring buffer in
  the xenbus driver was not optimal: it was busy looping and issuing a
  warning in each loop.

- In case of xenstore not running in dom0 but in a stubdom we end up
  with two xenbus threads running as the initialization of xenbus in
  dom0 expecting a local xenstored will be redone later when connecting
  to the xenstore domain. Up to now this was no problem as locking
  would prevent the two xenbus threads interfering with each other, but
  this was just a waste of kernel resources.

- An out of memory situation while writing to or reading from the
  xenstore ring buffer no longer will lead to a possible loss of
  synchronization with xenstore.

- The user read and write part are now interruptible by signals.
Signed-off-by: default avatarJuergen Gross <jgross@suse.com>
Signed-off-by: default avatarBoris Ostrovsky <boris.ostrovsky@oracle.com>
parent 5584ea25
...@@ -32,6 +32,10 @@ ...@@ -32,6 +32,10 @@
#ifndef _XENBUS_XENBUS_H #ifndef _XENBUS_XENBUS_H
#define _XENBUS_XENBUS_H #define _XENBUS_XENBUS_H
#include <linux/mutex.h>
#include <linux/uio.h>
#include <xen/xenbus.h>
#define XEN_BUS_ID_SIZE 20 #define XEN_BUS_ID_SIZE 20
struct xen_bus_type { struct xen_bus_type {
...@@ -52,16 +56,49 @@ enum xenstore_init { ...@@ -52,16 +56,49 @@ enum xenstore_init {
XS_LOCAL, XS_LOCAL,
}; };
struct xs_watch_event {
struct list_head list;
unsigned int len;
struct xenbus_watch *handle;
const char *path;
const char *token;
char body[];
};
enum xb_req_state {
xb_req_state_queued,
xb_req_state_wait_reply,
xb_req_state_got_reply,
xb_req_state_aborted
};
struct xb_req_data {
struct list_head list;
wait_queue_head_t wq;
struct xsd_sockmsg msg;
enum xsd_sockmsg_type type;
char *body;
const struct kvec *vec;
int num_vecs;
int err;
enum xb_req_state state;
void (*cb)(struct xb_req_data *);
void *par;
};
extern enum xenstore_init xen_store_domain_type; extern enum xenstore_init xen_store_domain_type;
extern const struct attribute_group *xenbus_dev_groups[]; extern const struct attribute_group *xenbus_dev_groups[];
extern struct mutex xs_response_mutex;
extern struct list_head xs_reply_list;
extern struct list_head xb_write_list;
extern wait_queue_head_t xb_waitq;
extern struct mutex xb_write_mutex;
int xs_init(void); int xs_init(void);
int xb_init_comms(void); int xb_init_comms(void);
void xb_deinit_comms(void); void xb_deinit_comms(void);
int xb_write(const void *data, unsigned int len); int xs_watch_msg(struct xs_watch_event *event);
int xb_read(void *data, unsigned int len); void xs_request_exit(struct xb_req_data *req);
int xb_data_to_read(void);
int xb_wait_for_data_to_read(void);
int xenbus_match(struct device *_dev, struct device_driver *_drv); int xenbus_match(struct device *_dev, struct device_driver *_drv);
int xenbus_dev_probe(struct device *_dev); int xenbus_dev_probe(struct device *_dev);
...@@ -92,6 +129,7 @@ int xenbus_read_otherend_details(struct xenbus_device *xendev, ...@@ -92,6 +129,7 @@ int xenbus_read_otherend_details(struct xenbus_device *xendev,
void xenbus_ring_ops_init(void); void xenbus_ring_ops_init(void);
void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg); int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par);
void xenbus_dev_queue_reply(struct xb_req_data *req);
#endif #endif
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <linux/wait.h> #include <linux/wait.h>
#include <linux/interrupt.h> #include <linux/interrupt.h>
#include <linux/kthread.h>
#include <linux/sched.h> #include <linux/sched.h>
#include <linux/err.h> #include <linux/err.h>
#include <xen/xenbus.h> #include <xen/xenbus.h>
...@@ -42,11 +43,22 @@ ...@@ -42,11 +43,22 @@
#include <xen/page.h> #include <xen/page.h>
#include "xenbus.h" #include "xenbus.h"
/* A list of replies. Currently only one will ever be outstanding. */
LIST_HEAD(xs_reply_list);
/* A list of write requests. */
LIST_HEAD(xb_write_list);
DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
DEFINE_MUTEX(xb_write_mutex);
/* Protect xenbus reader thread against save/restore. */
DEFINE_MUTEX(xs_response_mutex);
static int xenbus_irq; static int xenbus_irq;
static struct task_struct *xenbus_task;
static DECLARE_WORK(probe_work, xenbus_probe); static DECLARE_WORK(probe_work, xenbus_probe);
static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
static irqreturn_t wake_waiting(int irq, void *unused) static irqreturn_t wake_waiting(int irq, void *unused)
{ {
...@@ -84,30 +96,31 @@ static const void *get_input_chunk(XENSTORE_RING_IDX cons, ...@@ -84,30 +96,31 @@ static const void *get_input_chunk(XENSTORE_RING_IDX cons,
return buf + MASK_XENSTORE_IDX(cons); return buf + MASK_XENSTORE_IDX(cons);
} }
static int xb_data_to_write(void)
{
struct xenstore_domain_interface *intf = xen_store_interface;
return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE &&
!list_empty(&xb_write_list);
}
/** /**
* xb_write - low level write * xb_write - low level write
* @data: buffer to send * @data: buffer to send
* @len: length of buffer * @len: length of buffer
* *
* Returns 0 on success, error otherwise. * Returns number of bytes written or -err.
*/ */
int xb_write(const void *data, unsigned len) static int xb_write(const void *data, unsigned int len)
{ {
struct xenstore_domain_interface *intf = xen_store_interface; struct xenstore_domain_interface *intf = xen_store_interface;
XENSTORE_RING_IDX cons, prod; XENSTORE_RING_IDX cons, prod;
int rc; unsigned int bytes = 0;
while (len != 0) { while (len != 0) {
void *dst; void *dst;
unsigned int avail; unsigned int avail;
rc = wait_event_interruptible(
xb_waitq,
(intf->req_prod - intf->req_cons) !=
XENSTORE_RING_SIZE);
if (rc < 0)
return rc;
/* Read indexes, then verify. */ /* Read indexes, then verify. */
cons = intf->req_cons; cons = intf->req_cons;
prod = intf->req_prod; prod = intf->req_prod;
...@@ -115,6 +128,11 @@ int xb_write(const void *data, unsigned len) ...@@ -115,6 +128,11 @@ int xb_write(const void *data, unsigned len)
intf->req_cons = intf->req_prod = 0; intf->req_cons = intf->req_prod = 0;
return -EIO; return -EIO;
} }
if (!xb_data_to_write())
return bytes;
/* Must write data /after/ reading the consumer index. */
virt_mb();
dst = get_output_chunk(cons, prod, intf->req, &avail); dst = get_output_chunk(cons, prod, intf->req, &avail);
if (avail == 0) if (avail == 0)
...@@ -122,52 +140,45 @@ int xb_write(const void *data, unsigned len) ...@@ -122,52 +140,45 @@ int xb_write(const void *data, unsigned len)
if (avail > len) if (avail > len)
avail = len; avail = len;
/* Must write data /after/ reading the consumer index. */
virt_mb();
memcpy(dst, data, avail); memcpy(dst, data, avail);
data += avail; data += avail;
len -= avail; len -= avail;
bytes += avail;
/* Other side must not see new producer until data is there. */ /* Other side must not see new producer until data is there. */
virt_wmb(); virt_wmb();
intf->req_prod += avail; intf->req_prod += avail;
/* Implies mb(): other side will see the updated producer. */ /* Implies mb(): other side will see the updated producer. */
notify_remote_via_evtchn(xen_store_evtchn); if (prod <= intf->req_cons)
notify_remote_via_evtchn(xen_store_evtchn);
} }
return 0; return bytes;
} }
int xb_data_to_read(void) static int xb_data_to_read(void)
{ {
struct xenstore_domain_interface *intf = xen_store_interface; struct xenstore_domain_interface *intf = xen_store_interface;
return (intf->rsp_cons != intf->rsp_prod); return (intf->rsp_cons != intf->rsp_prod);
} }
int xb_wait_for_data_to_read(void) static int xb_read(void *data, unsigned int len)
{
return wait_event_interruptible(xb_waitq, xb_data_to_read());
}
int xb_read(void *data, unsigned len)
{ {
struct xenstore_domain_interface *intf = xen_store_interface; struct xenstore_domain_interface *intf = xen_store_interface;
XENSTORE_RING_IDX cons, prod; XENSTORE_RING_IDX cons, prod;
int rc; unsigned int bytes = 0;
while (len != 0) { while (len != 0) {
unsigned int avail; unsigned int avail;
const char *src; const char *src;
rc = xb_wait_for_data_to_read();
if (rc < 0)
return rc;
/* Read indexes, then verify. */ /* Read indexes, then verify. */
cons = intf->rsp_cons; cons = intf->rsp_cons;
prod = intf->rsp_prod; prod = intf->rsp_prod;
if (cons == prod)
return bytes;
if (!check_indexes(cons, prod)) { if (!check_indexes(cons, prod)) {
intf->rsp_cons = intf->rsp_prod = 0; intf->rsp_cons = intf->rsp_prod = 0;
return -EIO; return -EIO;
...@@ -185,17 +196,243 @@ int xb_read(void *data, unsigned len) ...@@ -185,17 +196,243 @@ int xb_read(void *data, unsigned len)
memcpy(data, src, avail); memcpy(data, src, avail);
data += avail; data += avail;
len -= avail; len -= avail;
bytes += avail;
/* Other side must not see free space until we've copied out */ /* Other side must not see free space until we've copied out */
virt_mb(); virt_mb();
intf->rsp_cons += avail; intf->rsp_cons += avail;
pr_debug("Finished read of %i bytes (%i to go)\n", avail, len);
/* Implies mb(): other side will see the updated consumer. */ /* Implies mb(): other side will see the updated consumer. */
notify_remote_via_evtchn(xen_store_evtchn); if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE)
notify_remote_via_evtchn(xen_store_evtchn);
}
return bytes;
}
static int process_msg(void)
{
static struct {
struct xsd_sockmsg msg;
char *body;
union {
void *alloc;
struct xs_watch_event *watch;
};
bool in_msg;
bool in_hdr;
unsigned int read;
} state;
struct xb_req_data *req;
int err;
unsigned int len;
if (!state.in_msg) {
state.in_msg = true;
state.in_hdr = true;
state.read = 0;
/*
* We must disallow save/restore while reading a message.
* A partial read across s/r leaves us out of sync with
* xenstored.
* xs_response_mutex is locked as long as we are processing one
* message. state.in_msg will be true as long as we are holding
* the lock here.
*/
mutex_lock(&xs_response_mutex);
if (!xb_data_to_read()) {
/* We raced with save/restore: pending data 'gone'. */
mutex_unlock(&xs_response_mutex);
state.in_msg = false;
return 0;
}
}
if (state.in_hdr) {
if (state.read != sizeof(state.msg)) {
err = xb_read((void *)&state.msg + state.read,
sizeof(state.msg) - state.read);
if (err < 0)
goto out;
state.read += err;
if (state.read != sizeof(state.msg))
return 0;
if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
err = -EINVAL;
goto out;
}
}
len = state.msg.len + 1;
if (state.msg.type == XS_WATCH_EVENT)
len += sizeof(*state.watch);
state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);
if (!state.alloc)
return -ENOMEM;
if (state.msg.type == XS_WATCH_EVENT)
state.body = state.watch->body;
else
state.body = state.alloc;
state.in_hdr = false;
state.read = 0;
}
err = xb_read(state.body + state.read, state.msg.len - state.read);
if (err < 0)
goto out;
state.read += err;
if (state.read != state.msg.len)
return 0;
state.body[state.msg.len] = '\0';
if (state.msg.type == XS_WATCH_EVENT) {
state.watch->len = state.msg.len;
err = xs_watch_msg(state.watch);
} else {
err = -ENOENT;
mutex_lock(&xb_write_mutex);
list_for_each_entry(req, &xs_reply_list, list) {
if (req->msg.req_id == state.msg.req_id) {
if (req->state == xb_req_state_wait_reply) {
req->msg.type = state.msg.type;
req->msg.len = state.msg.len;
req->body = state.body;
req->state = xb_req_state_got_reply;
list_del(&req->list);
req->cb(req);
} else {
list_del(&req->list);
kfree(req);
}
err = 0;
break;
}
}
mutex_unlock(&xb_write_mutex);
if (err)
goto out;
} }
mutex_unlock(&xs_response_mutex);
state.in_msg = false;
state.alloc = NULL;
return err;
out:
mutex_unlock(&xs_response_mutex);
state.in_msg = false;
kfree(state.alloc);
state.alloc = NULL;
return err;
}
static int process_writes(void)
{
static struct {
struct xb_req_data *req;
int idx;
unsigned int written;
} state;
void *base;
unsigned int len;
int err = 0;
if (!xb_data_to_write())
return 0;
mutex_lock(&xb_write_mutex);
if (!state.req) {
state.req = list_first_entry(&xb_write_list,
struct xb_req_data, list);
state.idx = -1;
state.written = 0;
}
if (state.req->state == xb_req_state_aborted)
goto out_err;
while (state.idx < state.req->num_vecs) {
if (state.idx < 0) {
base = &state.req->msg;
len = sizeof(state.req->msg);
} else {
base = state.req->vec[state.idx].iov_base;
len = state.req->vec[state.idx].iov_len;
}
err = xb_write(base + state.written, len - state.written);
if (err < 0)
goto out_err;
state.written += err;
if (state.written != len)
goto out;
state.idx++;
state.written = 0;
}
list_del(&state.req->list);
state.req->state = xb_req_state_wait_reply;
list_add_tail(&state.req->list, &xs_reply_list);
state.req = NULL;
out:
mutex_unlock(&xb_write_mutex);
return 0;
out_err:
state.req->msg.type = XS_ERROR;
state.req->err = err;
list_del(&state.req->list);
if (state.req->state == xb_req_state_aborted)
kfree(state.req);
else {
state.req->state = xb_req_state_got_reply;
wake_up(&state.req->wq);
}
mutex_unlock(&xb_write_mutex);
state.req = NULL;
return err;
}
static int xb_thread_work(void)
{
return xb_data_to_read() || xb_data_to_write();
}
static int xenbus_thread(void *unused)
{
int err;
while (!kthread_should_stop()) {
if (wait_event_interruptible(xb_waitq, xb_thread_work()))
continue;
err = process_msg();
if (err == -ENOMEM)
schedule();
else if (err)
pr_warn_ratelimited("error %d while reading message\n",
err);
err = process_writes();
if (err)
pr_warn_ratelimited("error %d while writing message\n",
err);
}
xenbus_task = NULL;
return 0; return 0;
} }
...@@ -223,6 +460,7 @@ int xb_init_comms(void) ...@@ -223,6 +460,7 @@ int xb_init_comms(void)
rebind_evtchn_irq(xen_store_evtchn, xenbus_irq); rebind_evtchn_irq(xen_store_evtchn, xenbus_irq);
} else { } else {
int err; int err;
err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting, err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting,
0, "xenbus", &xb_waitq); 0, "xenbus", &xb_waitq);
if (err < 0) { if (err < 0) {
...@@ -231,6 +469,13 @@ int xb_init_comms(void) ...@@ -231,6 +469,13 @@ int xb_init_comms(void)
} }
xenbus_irq = err; xenbus_irq = err;
if (!xenbus_task) {
xenbus_task = kthread_run(xenbus_thread, NULL,
"xenbus");
if (IS_ERR(xenbus_task))
return PTR_ERR(xenbus_task);
}
} }
return 0; return 0;
......
...@@ -113,6 +113,7 @@ struct xenbus_file_priv { ...@@ -113,6 +113,7 @@ struct xenbus_file_priv {
struct list_head read_buffers; struct list_head read_buffers;
wait_queue_head_t read_waitq; wait_queue_head_t read_waitq;
struct kref kref;
}; };
/* Read out any raw xenbus messages queued up. */ /* Read out any raw xenbus messages queued up. */
...@@ -297,6 +298,107 @@ static void watch_fired(struct xenbus_watch *watch, ...@@ -297,6 +298,107 @@ static void watch_fired(struct xenbus_watch *watch,
mutex_unlock(&adap->dev_data->reply_mutex); mutex_unlock(&adap->dev_data->reply_mutex);
} }
static void xenbus_file_free(struct kref *kref)
{
struct xenbus_file_priv *u;
struct xenbus_transaction_holder *trans, *tmp;
struct watch_adapter *watch, *tmp_watch;
struct read_buffer *rb, *tmp_rb;
u = container_of(kref, struct xenbus_file_priv, kref);
/*
* No need for locking here because there are no other users,
* by definition.
*/
list_for_each_entry_safe(trans, tmp, &u->transactions, list) {
xenbus_transaction_end(trans->handle, 1);
list_del(&trans->list);
kfree(trans);
}
list_for_each_entry_safe(watch, tmp_watch, &u->watches, list) {
unregister_xenbus_watch(&watch->watch);
list_del(&watch->list);
free_watch_adapter(watch);
}
list_for_each_entry_safe(rb, tmp_rb, &u->read_buffers, list) {
list_del(&rb->list);
kfree(rb);
}
kfree(u);
}
static struct xenbus_transaction_holder *xenbus_get_transaction(
struct xenbus_file_priv *u, uint32_t tx_id)
{
struct xenbus_transaction_holder *trans;
list_for_each_entry(trans, &u->transactions, list)
if (trans->handle.id == tx_id)
return trans;
return NULL;
}
void xenbus_dev_queue_reply(struct xb_req_data *req)
{
struct xenbus_file_priv *u = req->par;
struct xenbus_transaction_holder *trans = NULL;
int rc;
LIST_HEAD(staging_q);
xs_request_exit(req);
mutex_lock(&u->msgbuffer_mutex);
if (req->type == XS_TRANSACTION_START) {
trans = xenbus_get_transaction(u, 0);
if (WARN_ON(!trans))
goto out;
if (req->msg.type == XS_ERROR) {
list_del(&trans->list);
kfree(trans);
} else {
rc = kstrtou32(req->body, 10, &trans->handle.id);
if (WARN_ON(rc))
goto out;
}
} else if (req->msg.type == XS_TRANSACTION_END) {
trans = xenbus_get_transaction(u, req->msg.tx_id);
if (WARN_ON(!trans))
goto out;
list_del(&trans->list);
kfree(trans);
}
mutex_unlock(&u->msgbuffer_mutex);
mutex_lock(&u->reply_mutex);
rc = queue_reply(&staging_q, &req->msg, sizeof(req->msg));
if (!rc)
rc = queue_reply(&staging_q, req->body, req->msg.len);
if (!rc) {
list_splice_tail(&staging_q, &u->read_buffers);
wake_up(&u->read_waitq);
} else {
queue_cleanup(&staging_q);
}
mutex_unlock(&u->reply_mutex);
kfree(req->body);
kfree(req);
kref_put(&u->kref, xenbus_file_free);
return;
out:
mutex_unlock(&u->msgbuffer_mutex);
}
static int xenbus_command_reply(struct xenbus_file_priv *u, static int xenbus_command_reply(struct xenbus_file_priv *u,
unsigned int msg_type, const char *reply) unsigned int msg_type, const char *reply)
{ {
...@@ -317,6 +419,9 @@ static int xenbus_command_reply(struct xenbus_file_priv *u, ...@@ -317,6 +419,9 @@ static int xenbus_command_reply(struct xenbus_file_priv *u,
wake_up(&u->read_waitq); wake_up(&u->read_waitq);
mutex_unlock(&u->reply_mutex); mutex_unlock(&u->reply_mutex);
if (!rc)
kref_put(&u->kref, xenbus_file_free);
return rc; return rc;
} }
...@@ -324,57 +429,22 @@ static int xenbus_write_transaction(unsigned msg_type, ...@@ -324,57 +429,22 @@ static int xenbus_write_transaction(unsigned msg_type,
struct xenbus_file_priv *u) struct xenbus_file_priv *u)
{ {
int rc; int rc;
void *reply;
struct xenbus_transaction_holder *trans = NULL; struct xenbus_transaction_holder *trans = NULL;
LIST_HEAD(staging_q);
if (msg_type == XS_TRANSACTION_START) { if (msg_type == XS_TRANSACTION_START) {
trans = kmalloc(sizeof(*trans), GFP_KERNEL); trans = kzalloc(sizeof(*trans), GFP_KERNEL);
if (!trans) { if (!trans) {
rc = -ENOMEM; rc = -ENOMEM;
goto out; goto out;
} }
} else if (u->u.msg.tx_id != 0) { list_add(&trans->list, &u->transactions);
list_for_each_entry(trans, &u->transactions, list) } else if (u->u.msg.tx_id != 0 &&
if (trans->handle.id == u->u.msg.tx_id) !xenbus_get_transaction(u, u->u.msg.tx_id))
break; return xenbus_command_reply(u, XS_ERROR, "ENOENT");
if (&trans->list == &u->transactions)
return xenbus_command_reply(u, XS_ERROR, "ENOENT");
}
reply = xenbus_dev_request_and_reply(&u->u.msg);
if (IS_ERR(reply)) {
if (msg_type == XS_TRANSACTION_START)
kfree(trans);
rc = PTR_ERR(reply);
goto out;
}
if (msg_type == XS_TRANSACTION_START) { rc = xenbus_dev_request_and_reply(&u->u.msg, u);
if (u->u.msg.type == XS_ERROR) if (rc)
kfree(trans);
else {
trans->handle.id = simple_strtoul(reply, NULL, 0);
list_add(&trans->list, &u->transactions);
}
} else if (u->u.msg.type == XS_TRANSACTION_END) {
list_del(&trans->list);
kfree(trans); kfree(trans);
}
mutex_lock(&u->reply_mutex);
rc = queue_reply(&staging_q, &u->u.msg, sizeof(u->u.msg));
if (!rc)
rc = queue_reply(&staging_q, reply, u->u.msg.len);
if (!rc) {
list_splice_tail(&staging_q, &u->read_buffers);
wake_up(&u->read_waitq);
} else {
queue_cleanup(&staging_q);
}
mutex_unlock(&u->reply_mutex);
kfree(reply);
out: out:
return rc; return rc;
...@@ -506,6 +576,8 @@ static ssize_t xenbus_file_write(struct file *filp, ...@@ -506,6 +576,8 @@ static ssize_t xenbus_file_write(struct file *filp,
* OK, now we have a complete message. Do something with it. * OK, now we have a complete message. Do something with it.
*/ */
kref_get(&u->kref);
msg_type = u->u.msg.type; msg_type = u->u.msg.type;
switch (msg_type) { switch (msg_type) {
...@@ -520,8 +592,10 @@ static ssize_t xenbus_file_write(struct file *filp, ...@@ -520,8 +592,10 @@ static ssize_t xenbus_file_write(struct file *filp,
ret = xenbus_write_transaction(msg_type, u); ret = xenbus_write_transaction(msg_type, u);
break; break;
} }
if (ret != 0) if (ret != 0) {
rc = ret; rc = ret;
kref_put(&u->kref, xenbus_file_free);
}
/* Buffered message consumed */ /* Buffered message consumed */
u->len = 0; u->len = 0;
...@@ -546,6 +620,8 @@ static int xenbus_file_open(struct inode *inode, struct file *filp) ...@@ -546,6 +620,8 @@ static int xenbus_file_open(struct inode *inode, struct file *filp)
if (u == NULL) if (u == NULL)
return -ENOMEM; return -ENOMEM;
kref_init(&u->kref);
INIT_LIST_HEAD(&u->transactions); INIT_LIST_HEAD(&u->transactions);
INIT_LIST_HEAD(&u->watches); INIT_LIST_HEAD(&u->watches);
INIT_LIST_HEAD(&u->read_buffers); INIT_LIST_HEAD(&u->read_buffers);
...@@ -562,32 +638,8 @@ static int xenbus_file_open(struct inode *inode, struct file *filp) ...@@ -562,32 +638,8 @@ static int xenbus_file_open(struct inode *inode, struct file *filp)
static int xenbus_file_release(struct inode *inode, struct file *filp) static int xenbus_file_release(struct inode *inode, struct file *filp)
{ {
struct xenbus_file_priv *u = filp->private_data; struct xenbus_file_priv *u = filp->private_data;
struct xenbus_transaction_holder *trans, *tmp;
struct watch_adapter *watch, *tmp_watch;
struct read_buffer *rb, *tmp_rb;
/*
* No need for locking here because there are no other users,
* by definition.
*/
list_for_each_entry_safe(trans, tmp, &u->transactions, list) { kref_put(&u->kref, xenbus_file_free);
xenbus_transaction_end(trans->handle, 1);
list_del(&trans->list);
kfree(trans);
}
list_for_each_entry_safe(watch, tmp_watch, &u->watches, list) {
unregister_xenbus_watch(&watch->watch);
list_del(&watch->list);
free_watch_adapter(watch);
}
list_for_each_entry_safe(rb, tmp_rb, &u->read_buffers, list) {
list_del(&rb->list);
kfree(rb);
}
kfree(u);
return 0; return 0;
} }
......
...@@ -43,6 +43,7 @@ ...@@ -43,6 +43,7 @@
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/fcntl.h> #include <linux/fcntl.h>
#include <linux/kthread.h> #include <linux/kthread.h>
#include <linux/reboot.h>
#include <linux/rwsem.h> #include <linux/rwsem.h>
#include <linux/mutex.h> #include <linux/mutex.h>
#include <asm/xen/hypervisor.h> #include <asm/xen/hypervisor.h>
...@@ -50,61 +51,28 @@ ...@@ -50,61 +51,28 @@
#include <xen/xen.h> #include <xen/xen.h>
#include "xenbus.h" #include "xenbus.h"
struct xs_stored_msg { /*
struct list_head list; * Framework to protect suspend/resume handling against normal Xenstore
* message handling:
struct xsd_sockmsg hdr; * During suspend/resume there must be no open transaction and no pending
* Xenstore request.
union { * New watch events happening in this time can be ignored by firing all watches
/* Queued replies. */ * after resume.
struct { */
char *body;
} reply;
/* Queued watch events. */
struct {
struct xenbus_watch *handle;
const char *path;
const char *token;
} watch;
} u;
};
struct xs_handle { /* Lock protecting enter/exit critical region. */
/* A list of replies. Currently only one will ever be outstanding. */ static DEFINE_SPINLOCK(xs_state_lock);
struct list_head reply_list; /* Number of users in critical region (protected by xs_state_lock). */
spinlock_t reply_lock; static unsigned int xs_state_users;
wait_queue_head_t reply_waitq; /* Suspend handler waiting or already active (protected by xs_state_lock)? */
static int xs_suspend_active;
/* /* Unique Xenstore request id (protected by xs_state_lock). */
* Mutex ordering: transaction_mutex -> watch_mutex -> request_mutex. static uint32_t xs_request_id;
* response_mutex is never taken simultaneously with the other three.
*
* transaction_mutex must be held before incrementing
* transaction_count. The mutex is held when a suspend is in
* progress to prevent new transactions starting.
*
* When decrementing transaction_count to zero the wait queue
* should be woken up, the suspend code waits for count to
* reach zero.
*/
/* One request at a time. */
struct mutex request_mutex;
/* Protect xenbus reader thread against save/restore. */
struct mutex response_mutex;
/* Protect transactions against save/restore. */
struct mutex transaction_mutex;
atomic_t transaction_count;
wait_queue_head_t transaction_wq;
/* Protect watch (de)register against save/restore. */
struct rw_semaphore watch_mutex;
};
static struct xs_handle xs_state; /* Wait queue for all callers waiting for critical region to become usable. */
static DECLARE_WAIT_QUEUE_HEAD(xs_state_enter_wq);
/* Wait queue for suspend handling waiting for critical region being empty. */
static DECLARE_WAIT_QUEUE_HEAD(xs_state_exit_wq);
/* List of registered watches, and a lock to protect it. */ /* List of registered watches, and a lock to protect it. */
static LIST_HEAD(watches); static LIST_HEAD(watches);
...@@ -114,6 +82,9 @@ static DEFINE_SPINLOCK(watches_lock); ...@@ -114,6 +82,9 @@ static DEFINE_SPINLOCK(watches_lock);
static LIST_HEAD(watch_events); static LIST_HEAD(watch_events);
static DEFINE_SPINLOCK(watch_events_lock); static DEFINE_SPINLOCK(watch_events_lock);
/* Protect watch (de)register against save/restore. */
static DECLARE_RWSEM(xs_watch_rwsem);
/* /*
* Details of the xenwatch callback kernel thread. The thread waits on the * Details of the xenwatch callback kernel thread. The thread waits on the
* watch_events_waitq for work to do (queued on watch_events list). When it * watch_events_waitq for work to do (queued on watch_events list). When it
...@@ -124,6 +95,59 @@ static pid_t xenwatch_pid; ...@@ -124,6 +95,59 @@ static pid_t xenwatch_pid;
static DEFINE_MUTEX(xenwatch_mutex); static DEFINE_MUTEX(xenwatch_mutex);
static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq); static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq);
static void xs_suspend_enter(void)
{
spin_lock(&xs_state_lock);
xs_suspend_active++;
spin_unlock(&xs_state_lock);
wait_event(xs_state_exit_wq, xs_state_users == 0);
}
static void xs_suspend_exit(void)
{
spin_lock(&xs_state_lock);
xs_suspend_active--;
spin_unlock(&xs_state_lock);
wake_up_all(&xs_state_enter_wq);
}
static uint32_t xs_request_enter(struct xb_req_data *req)
{
uint32_t rq_id;
req->type = req->msg.type;
spin_lock(&xs_state_lock);
while (!xs_state_users && xs_suspend_active) {
spin_unlock(&xs_state_lock);
wait_event(xs_state_enter_wq, xs_suspend_active == 0);
spin_lock(&xs_state_lock);
}
if (req->type == XS_TRANSACTION_START)
xs_state_users++;
xs_state_users++;
rq_id = xs_request_id++;
spin_unlock(&xs_state_lock);
return rq_id;
}
void xs_request_exit(struct xb_req_data *req)
{
spin_lock(&xs_state_lock);
xs_state_users--;
if ((req->type == XS_TRANSACTION_START && req->msg.type == XS_ERROR) ||
req->type == XS_TRANSACTION_END)
xs_state_users--;
spin_unlock(&xs_state_lock);
if (xs_suspend_active && !xs_state_users)
wake_up(&xs_state_exit_wq);
}
static int get_error(const char *errorstring) static int get_error(const char *errorstring)
{ {
unsigned int i; unsigned int i;
...@@ -161,21 +185,24 @@ static bool xenbus_ok(void) ...@@ -161,21 +185,24 @@ static bool xenbus_ok(void)
} }
return false; return false;
} }
static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
static bool test_reply(struct xb_req_data *req)
{ {
struct xs_stored_msg *msg; if (req->state == xb_req_state_got_reply || !xenbus_ok())
char *body; return true;
spin_lock(&xs_state.reply_lock); /* Make sure to reread req->state each time. */
barrier();
while (list_empty(&xs_state.reply_list)) { return false;
spin_unlock(&xs_state.reply_lock); }
if (xenbus_ok())
/* XXX FIXME: Avoid synchronous wait for response here. */ static void *read_reply(struct xb_req_data *req)
wait_event_timeout(xs_state.reply_waitq, {
!list_empty(&xs_state.reply_list), while (req->state != xb_req_state_got_reply) {
msecs_to_jiffies(500)); wait_event(req->wq, test_reply(req));
else {
if (!xenbus_ok())
/* /*
* If we are in the process of being shut-down there is * If we are in the process of being shut-down there is
* no point of trying to contact XenBus - it is either * no point of trying to contact XenBus - it is either
...@@ -183,76 +210,82 @@ static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) ...@@ -183,76 +210,82 @@ static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
* has been killed or is unreachable. * has been killed or is unreachable.
*/ */
return ERR_PTR(-EIO); return ERR_PTR(-EIO);
} if (req->err)
spin_lock(&xs_state.reply_lock); return ERR_PTR(req->err);
} }
msg = list_entry(xs_state.reply_list.next, return req->body;
struct xs_stored_msg, list); }
list_del(&msg->list);
spin_unlock(&xs_state.reply_lock); static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg)
{
bool notify;
*type = msg->hdr.type; req->msg = *msg;
if (len) req->err = 0;
*len = msg->hdr.len; req->state = xb_req_state_queued;
body = msg->u.reply.body; init_waitqueue_head(&req->wq);
kfree(msg); req->msg.req_id = xs_request_enter(req);
return body; mutex_lock(&xb_write_mutex);
} list_add_tail(&req->list, &xb_write_list);
notify = list_is_singular(&xb_write_list);
mutex_unlock(&xb_write_mutex);
static void transaction_start(void) if (notify)
{ wake_up(&xb_waitq);
mutex_lock(&xs_state.transaction_mutex);
atomic_inc(&xs_state.transaction_count);
mutex_unlock(&xs_state.transaction_mutex);
} }
static void transaction_end(void) static void *xs_wait_for_reply(struct xb_req_data *req, struct xsd_sockmsg *msg)
{ {
if (atomic_dec_and_test(&xs_state.transaction_count)) void *ret;
wake_up(&xs_state.transaction_wq);
}
static void transaction_suspend(void) ret = read_reply(req);
{
mutex_lock(&xs_state.transaction_mutex); xs_request_exit(req);
wait_event(xs_state.transaction_wq,
atomic_read(&xs_state.transaction_count) == 0); msg->type = req->msg.type;
msg->len = req->msg.len;
mutex_lock(&xb_write_mutex);
if (req->state == xb_req_state_queued ||
req->state == xb_req_state_wait_reply)
req->state = xb_req_state_aborted;
else
kfree(req);
mutex_unlock(&xb_write_mutex);
return ret;
} }
static void transaction_resume(void) static void xs_wake_up(struct xb_req_data *req)
{ {
mutex_unlock(&xs_state.transaction_mutex); wake_up(&req->wq);
} }
void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg) int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par)
{ {
void *ret; struct xb_req_data *req;
enum xsd_sockmsg_type type = msg->type; struct kvec *vec;
int err;
if (type == XS_TRANSACTION_START) req = kmalloc(sizeof(*req) + sizeof(*vec), GFP_KERNEL);
transaction_start(); if (!req)
return -ENOMEM;
mutex_lock(&xs_state.request_mutex);
err = xb_write(msg, sizeof(*msg) + msg->len); vec = (struct kvec *)(req + 1);
if (err) { vec->iov_len = msg->len;
msg->type = XS_ERROR; vec->iov_base = msg + 1;
ret = ERR_PTR(err);
} else
ret = read_reply(&msg->type, &msg->len);
mutex_unlock(&xs_state.request_mutex); req->vec = vec;
req->num_vecs = 1;
req->cb = xenbus_dev_queue_reply;
req->par = par;
if ((msg->type == XS_TRANSACTION_END) || xs_send(req, msg);
((type == XS_TRANSACTION_START) && (msg->type == XS_ERROR)))
transaction_end();
return ret; return 0;
} }
EXPORT_SYMBOL(xenbus_dev_request_and_reply); EXPORT_SYMBOL(xenbus_dev_request_and_reply);
...@@ -263,37 +296,31 @@ static void *xs_talkv(struct xenbus_transaction t, ...@@ -263,37 +296,31 @@ static void *xs_talkv(struct xenbus_transaction t,
unsigned int num_vecs, unsigned int num_vecs,
unsigned int *len) unsigned int *len)
{ {
struct xb_req_data *req;
struct xsd_sockmsg msg; struct xsd_sockmsg msg;
void *ret = NULL; void *ret = NULL;
unsigned int i; unsigned int i;
int err; int err;
req = kmalloc(sizeof(*req), GFP_NOIO | __GFP_HIGH);
if (!req)
return ERR_PTR(-ENOMEM);
req->vec = iovec;
req->num_vecs = num_vecs;
req->cb = xs_wake_up;
msg.tx_id = t.id; msg.tx_id = t.id;
msg.req_id = 0;
msg.type = type; msg.type = type;
msg.len = 0; msg.len = 0;
for (i = 0; i < num_vecs; i++) for (i = 0; i < num_vecs; i++)
msg.len += iovec[i].iov_len; msg.len += iovec[i].iov_len;
mutex_lock(&xs_state.request_mutex); xs_send(req, &msg);
err = xb_write(&msg, sizeof(msg));
if (err) {
mutex_unlock(&xs_state.request_mutex);
return ERR_PTR(err);
}
for (i = 0; i < num_vecs; i++) {
err = xb_write(iovec[i].iov_base, iovec[i].iov_len);
if (err) {
mutex_unlock(&xs_state.request_mutex);
return ERR_PTR(err);
}
}
ret = read_reply(&msg.type, len);
mutex_unlock(&xs_state.request_mutex); ret = xs_wait_for_reply(req, &msg);
if (len)
*len = msg.len;
if (IS_ERR(ret)) if (IS_ERR(ret))
return ret; return ret;
...@@ -500,13 +527,9 @@ int xenbus_transaction_start(struct xenbus_transaction *t) ...@@ -500,13 +527,9 @@ int xenbus_transaction_start(struct xenbus_transaction *t)
{ {
char *id_str; char *id_str;
transaction_start();
id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL); id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL);
if (IS_ERR(id_str)) { if (IS_ERR(id_str))
transaction_end();
return PTR_ERR(id_str); return PTR_ERR(id_str);
}
t->id = simple_strtoul(id_str, NULL, 0); t->id = simple_strtoul(id_str, NULL, 0);
kfree(id_str); kfree(id_str);
...@@ -520,18 +543,13 @@ EXPORT_SYMBOL_GPL(xenbus_transaction_start); ...@@ -520,18 +543,13 @@ EXPORT_SYMBOL_GPL(xenbus_transaction_start);
int xenbus_transaction_end(struct xenbus_transaction t, int abort) int xenbus_transaction_end(struct xenbus_transaction t, int abort)
{ {
char abortstr[2]; char abortstr[2];
int err;
if (abort) if (abort)
strcpy(abortstr, "F"); strcpy(abortstr, "F");
else else
strcpy(abortstr, "T"); strcpy(abortstr, "T");
err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL)); return xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
transaction_end();
return err;
} }
EXPORT_SYMBOL_GPL(xenbus_transaction_end); EXPORT_SYMBOL_GPL(xenbus_transaction_end);
...@@ -664,6 +682,30 @@ static struct xenbus_watch *find_watch(const char *token) ...@@ -664,6 +682,30 @@ static struct xenbus_watch *find_watch(const char *token)
return NULL; return NULL;
} }
int xs_watch_msg(struct xs_watch_event *event)
{
if (count_strings(event->body, event->len) != 2) {
kfree(event);
return -EINVAL;
}
event->path = (const char *)event->body;
event->token = (const char *)strchr(event->body, '\0') + 1;
spin_lock(&watches_lock);
event->handle = find_watch(event->token);
if (event->handle != NULL) {
spin_lock(&watch_events_lock);
list_add_tail(&event->list, &watch_events);
wake_up(&watch_events_waitq);
spin_unlock(&watch_events_lock);
} else
kfree(event);
spin_unlock(&watches_lock);
return 0;
}
/* /*
* Certain older XenBus toolstack cannot handle reading values that are * Certain older XenBus toolstack cannot handle reading values that are
* not populated. Some Xen 3.4 installation are incapable of doing this * not populated. Some Xen 3.4 installation are incapable of doing this
...@@ -712,7 +754,7 @@ int register_xenbus_watch(struct xenbus_watch *watch) ...@@ -712,7 +754,7 @@ int register_xenbus_watch(struct xenbus_watch *watch)
sprintf(token, "%lX", (long)watch); sprintf(token, "%lX", (long)watch);
down_read(&xs_state.watch_mutex); down_read(&xs_watch_rwsem);
spin_lock(&watches_lock); spin_lock(&watches_lock);
BUG_ON(find_watch(token)); BUG_ON(find_watch(token));
...@@ -727,7 +769,7 @@ int register_xenbus_watch(struct xenbus_watch *watch) ...@@ -727,7 +769,7 @@ int register_xenbus_watch(struct xenbus_watch *watch)
spin_unlock(&watches_lock); spin_unlock(&watches_lock);
} }
up_read(&xs_state.watch_mutex); up_read(&xs_watch_rwsem);
return err; return err;
} }
...@@ -735,13 +777,13 @@ EXPORT_SYMBOL_GPL(register_xenbus_watch); ...@@ -735,13 +777,13 @@ EXPORT_SYMBOL_GPL(register_xenbus_watch);
void unregister_xenbus_watch(struct xenbus_watch *watch) void unregister_xenbus_watch(struct xenbus_watch *watch)
{ {
struct xs_stored_msg *msg, *tmp; struct xs_watch_event *event, *tmp;
char token[sizeof(watch) * 2 + 1]; char token[sizeof(watch) * 2 + 1];
int err; int err;
sprintf(token, "%lX", (long)watch); sprintf(token, "%lX", (long)watch);
down_read(&xs_state.watch_mutex); down_read(&xs_watch_rwsem);
spin_lock(&watches_lock); spin_lock(&watches_lock);
BUG_ON(!find_watch(token)); BUG_ON(!find_watch(token));
...@@ -752,7 +794,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) ...@@ -752,7 +794,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch)
if (err) if (err)
pr_warn("Failed to release watch %s: %i\n", watch->node, err); pr_warn("Failed to release watch %s: %i\n", watch->node, err);
up_read(&xs_state.watch_mutex); up_read(&xs_watch_rwsem);
/* Make sure there are no callbacks running currently (unless /* Make sure there are no callbacks running currently (unless
its us) */ its us) */
...@@ -761,12 +803,11 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) ...@@ -761,12 +803,11 @@ void unregister_xenbus_watch(struct xenbus_watch *watch)
/* Cancel pending watch events. */ /* Cancel pending watch events. */
spin_lock(&watch_events_lock); spin_lock(&watch_events_lock);
list_for_each_entry_safe(msg, tmp, &watch_events, list) { list_for_each_entry_safe(event, tmp, &watch_events, list) {
if (msg->u.watch.handle != watch) if (event->handle != watch)
continue; continue;
list_del(&msg->list); list_del(&event->list);
kfree(msg->u.watch.path); kfree(event);
kfree(msg);
} }
spin_unlock(&watch_events_lock); spin_unlock(&watch_events_lock);
...@@ -777,10 +818,10 @@ EXPORT_SYMBOL_GPL(unregister_xenbus_watch); ...@@ -777,10 +818,10 @@ EXPORT_SYMBOL_GPL(unregister_xenbus_watch);
void xs_suspend(void) void xs_suspend(void)
{ {
transaction_suspend(); xs_suspend_enter();
down_write(&xs_state.watch_mutex);
mutex_lock(&xs_state.request_mutex); down_write(&xs_watch_rwsem);
mutex_lock(&xs_state.response_mutex); mutex_lock(&xs_response_mutex);
} }
void xs_resume(void) void xs_resume(void)
...@@ -790,31 +831,31 @@ void xs_resume(void) ...@@ -790,31 +831,31 @@ void xs_resume(void)
xb_init_comms(); xb_init_comms();
mutex_unlock(&xs_state.response_mutex); mutex_unlock(&xs_response_mutex);
mutex_unlock(&xs_state.request_mutex);
transaction_resume(); xs_suspend_exit();
/* No need for watches_lock: the watch_mutex is sufficient. */ /* No need for watches_lock: the xs_watch_rwsem is sufficient. */
list_for_each_entry(watch, &watches, list) { list_for_each_entry(watch, &watches, list) {
sprintf(token, "%lX", (long)watch); sprintf(token, "%lX", (long)watch);
xs_watch(watch->node, token); xs_watch(watch->node, token);
} }
up_write(&xs_state.watch_mutex); up_write(&xs_watch_rwsem);
} }
void xs_suspend_cancel(void) void xs_suspend_cancel(void)
{ {
mutex_unlock(&xs_state.response_mutex); mutex_unlock(&xs_response_mutex);
mutex_unlock(&xs_state.request_mutex); up_write(&xs_watch_rwsem);
up_write(&xs_state.watch_mutex);
mutex_unlock(&xs_state.transaction_mutex); xs_suspend_exit();
} }
static int xenwatch_thread(void *unused) static int xenwatch_thread(void *unused)
{ {
struct list_head *ent; struct list_head *ent;
struct xs_stored_msg *msg; struct xs_watch_event *event;
for (;;) { for (;;) {
wait_event_interruptible(watch_events_waitq, wait_event_interruptible(watch_events_waitq,
...@@ -832,12 +873,10 @@ static int xenwatch_thread(void *unused) ...@@ -832,12 +873,10 @@ static int xenwatch_thread(void *unused)
spin_unlock(&watch_events_lock); spin_unlock(&watch_events_lock);
if (ent != &watch_events) { if (ent != &watch_events) {
msg = list_entry(ent, struct xs_stored_msg, list); event = list_entry(ent, struct xs_watch_event, list);
msg->u.watch.handle->callback(msg->u.watch.handle, event->handle->callback(event->handle, event->path,
msg->u.watch.path, event->token);
msg->u.watch.token); kfree(event);
kfree(msg->u.watch.path);
kfree(msg);
} }
mutex_unlock(&xenwatch_mutex); mutex_unlock(&xenwatch_mutex);
...@@ -846,126 +885,37 @@ static int xenwatch_thread(void *unused) ...@@ -846,126 +885,37 @@ static int xenwatch_thread(void *unused)
return 0; return 0;
} }
static int process_msg(void) /*
* Wake up all threads waiting for a xenstore reply. In case of shutdown all
* pending replies will be marked as "aborted" in order to let the waiters
* return in spite of xenstore possibly no longer being able to reply. This
* will avoid blocking shutdown by a thread waiting for xenstore but being
* necessary for shutdown processing to proceed.
*/
static int xs_reboot_notify(struct notifier_block *nb,
unsigned long code, void *unused)
{ {
struct xs_stored_msg *msg; struct xb_req_data *req;
char *body;
int err;
/*
* We must disallow save/restore while reading a xenstore message.
* A partial read across s/r leaves us out of sync with xenstored.
*/
for (;;) {
err = xb_wait_for_data_to_read();
if (err)
return err;
mutex_lock(&xs_state.response_mutex);
if (xb_data_to_read())
break;
/* We raced with save/restore: pending data 'disappeared'. */
mutex_unlock(&xs_state.response_mutex);
}
mutex_lock(&xb_write_mutex);
msg = kmalloc(sizeof(*msg), GFP_NOIO | __GFP_HIGH); list_for_each_entry(req, &xs_reply_list, list)
if (msg == NULL) { wake_up(&req->wq);
err = -ENOMEM; list_for_each_entry(req, &xb_write_list, list)
goto out; wake_up(&req->wq);
} mutex_unlock(&xb_write_mutex);
return NOTIFY_DONE;
err = xb_read(&msg->hdr, sizeof(msg->hdr));
if (err) {
kfree(msg);
goto out;
}
if (msg->hdr.len > XENSTORE_PAYLOAD_MAX) {
kfree(msg);
err = -EINVAL;
goto out;
}
body = kmalloc(msg->hdr.len + 1, GFP_NOIO | __GFP_HIGH);
if (body == NULL) {
kfree(msg);
err = -ENOMEM;
goto out;
}
err = xb_read(body, msg->hdr.len);
if (err) {
kfree(body);
kfree(msg);
goto out;
}
body[msg->hdr.len] = '\0';
if (msg->hdr.type == XS_WATCH_EVENT) {
if (count_strings(body, msg->hdr.len) != 2) {
err = -EINVAL;
kfree(msg);
kfree(body);
goto out;
}
msg->u.watch.path = (const char *)body;
msg->u.watch.token = (const char *)strchr(body, '\0') + 1;
spin_lock(&watches_lock);
msg->u.watch.handle = find_watch(msg->u.watch.token);
if (msg->u.watch.handle != NULL) {
spin_lock(&watch_events_lock);
list_add_tail(&msg->list, &watch_events);
wake_up(&watch_events_waitq);
spin_unlock(&watch_events_lock);
} else {
kfree(body);
kfree(msg);
}
spin_unlock(&watches_lock);
} else {
msg->u.reply.body = body;
spin_lock(&xs_state.reply_lock);
list_add_tail(&msg->list, &xs_state.reply_list);
spin_unlock(&xs_state.reply_lock);
wake_up(&xs_state.reply_waitq);
}
out:
mutex_unlock(&xs_state.response_mutex);
return err;
} }
static int xenbus_thread(void *unused) static struct notifier_block xs_reboot_nb = {
{ .notifier_call = xs_reboot_notify,
int err; };
for (;;) {
err = process_msg();
if (err)
pr_warn("error %d while reading message\n", err);
if (kthread_should_stop())
break;
}
return 0;
}
int xs_init(void) int xs_init(void)
{ {
int err; int err;
struct task_struct *task; struct task_struct *task;
INIT_LIST_HEAD(&xs_state.reply_list); register_reboot_notifier(&xs_reboot_nb);
spin_lock_init(&xs_state.reply_lock);
init_waitqueue_head(&xs_state.reply_waitq);
mutex_init(&xs_state.request_mutex);
mutex_init(&xs_state.response_mutex);
mutex_init(&xs_state.transaction_mutex);
init_rwsem(&xs_state.watch_mutex);
atomic_set(&xs_state.transaction_count, 0);
init_waitqueue_head(&xs_state.transaction_wq);
/* Initialize the shared memory rings to talk to xenstored */ /* Initialize the shared memory rings to talk to xenstored */
err = xb_init_comms(); err = xb_init_comms();
...@@ -977,10 +927,6 @@ int xs_init(void) ...@@ -977,10 +927,6 @@ int xs_init(void)
return PTR_ERR(task); return PTR_ERR(task);
xenwatch_pid = task->pid; xenwatch_pid = task->pid;
task = kthread_run(xenbus_thread, NULL, "xenbus");
if (IS_ERR(task))
return PTR_ERR(task);
/* shutdown watches for kexec boot */ /* shutdown watches for kexec boot */
xs_reset_watches(); xs_reset_watches();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment