Commit 34b98f22 authored by Andrew Morton's avatar Andrew Morton Committed by Linus Torvalds

[PATCH] posix message queues: send notifications via netlink

From: Manfred Spraul <manfred@colorfullife.com>

SIGEV_THREAD means that a given callback should be called in the context on a
new thread.  This must be done by the C library.  The kernel must deliver a
notice of the event to the C library when the callback should be called.

This patch switches to a new, simpler interface: User space creates a socket
with socket(PF_NETLINK, SOCK_RAW,0) and passes the fd to the mq_notify call
together with a cookie.  When the mq_notify() condition is satisfied, the
kernel "writes" the cookie to the socket.  User space then reads the cookie
and calls the appropriate callback.
parent ed6dcf4a
...@@ -30,8 +30,24 @@ struct mq_attr { ...@@ -30,8 +30,24 @@ struct mq_attr {
long __reserved[4]; /* ignored for input, zeroed for output */ long __reserved[4]; /* ignored for input, zeroed for output */
}; };
/*
* SIGEV_THREAD implementation:
* SIGEV_THREAD must be implemented in user space. If SIGEV_THREAD is passed
* to mq_notify, then
* - sigev_signo must be the file descriptor of an AF_NETLINK socket. It's not
* necessary that the socket is bound.
* - sigev_value.sival_ptr must point to a cookie that is NOTIFY_COOKIE_LEN
* bytes long.
* If the notification is triggered, then the cookie is sent to the netlink
* socket. The last byte of the cookie is replaced with the NOTIFY_?? codes:
* NOTIFY_WOKENUP if the notification got triggered, NOTIFY_REMOVED if it was
* removed, either due to a close() on the message queue fd or due to a
* mq_notify() that removed the notification.
*/
#define NOTIFY_NONE 0 #define NOTIFY_NONE 0
#define NOTIFY_WOKENUP 1 #define NOTIFY_WOKENUP 1
#define NOTIFY_REMOVED 2 #define NOTIFY_REMOVED 2
#define NOTIFY_COOKIE_LEN 32
#endif #endif
...@@ -20,6 +20,9 @@ ...@@ -20,6 +20,9 @@
#include <linux/poll.h> #include <linux/poll.h>
#include <linux/mqueue.h> #include <linux/mqueue.h>
#include <linux/msg.h> #include <linux/msg.h>
#include <linux/skbuff.h>
#include <linux/netlink.h>
#include <net/sock.h>
#include "util.h" #include "util.h"
#define MQUEUE_MAGIC 0x19800202 #define MQUEUE_MAGIC 0x19800202
...@@ -33,9 +36,6 @@ ...@@ -33,9 +36,6 @@
#define STATE_PENDING 1 #define STATE_PENDING 1
#define STATE_READY 2 #define STATE_READY 2
#define NP_NONE ((void*)NOTIFY_NONE)
#define NP_WOKENUP ((void*)NOTIFY_WOKENUP)
#define NP_REMOVED ((void*)NOTIFY_REMOVED)
/* used by sysctl */ /* used by sysctl */
#define FS_MQUEUE 1 #define FS_MQUEUE 1
#define CTL_QUEUESMAX 2 #define CTL_QUEUESMAX 2
...@@ -48,6 +48,8 @@ ...@@ -48,6 +48,8 @@
#define HARD_MSGMAX (131072/sizeof(void*)) #define HARD_MSGMAX (131072/sizeof(void*))
#define DFLT_MSGSIZEMAX 16384 /* max message size */ #define DFLT_MSGSIZEMAX 16384 /* max message size */
#define NOTIFY_COOKIE_LEN 32
struct ext_wait_queue { /* queue of sleeping tasks */ struct ext_wait_queue { /* queue of sleeping tasks */
struct task_struct *task; struct task_struct *task;
struct list_head list; struct list_head list;
...@@ -56,25 +58,26 @@ struct ext_wait_queue { /* queue of sleeping tasks */ ...@@ -56,25 +58,26 @@ struct ext_wait_queue { /* queue of sleeping tasks */
}; };
struct mqueue_inode_info { struct mqueue_inode_info {
struct mq_attr attr; spinlock_t lock;
struct inode vfs_inode;
wait_queue_head_t wait_q;
struct msg_msg **messages; struct msg_msg **messages;
struct mq_attr attr;
pid_t notify_owner; /* != 0 means notification registered */ struct sigevent notify; /* notify.sigev_notify == SIGEV_NONE means */
struct sigevent notify; pid_t notify_owner; /* no notification registered */
struct file *notify_filp; struct sock *notify_sock;
struct sk_buff *notify_cookie;
/* for tasks waiting for free space and messages, respectively */ /* for tasks waiting for free space and messages, respectively */
struct ext_wait_queue e_wait_q[2]; struct ext_wait_queue e_wait_q[2];
wait_queue_head_t wait_q;
unsigned long qsize; /* size of queue in memory (sum of all msgs) */ unsigned long qsize; /* size of queue in memory (sum of all msgs) */
spinlock_t lock;
struct inode vfs_inode;
}; };
static struct inode_operations mqueue_dir_inode_operations; static struct inode_operations mqueue_dir_inode_operations;
static struct file_operations mqueue_file_operations; static struct file_operations mqueue_file_operations;
static struct file_operations mqueue_notify_fops;
static struct super_operations mqueue_super_ops; static struct super_operations mqueue_super_ops;
static void remove_notification(struct mqueue_inode_info *info); static void remove_notification(struct mqueue_inode_info *info);
...@@ -119,7 +122,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb, int mode) ...@@ -119,7 +122,7 @@ static struct inode *mqueue_get_inode(struct super_block *sb, int mode)
init_waitqueue_head(&info->wait_q); init_waitqueue_head(&info->wait_q);
INIT_LIST_HEAD(&info->e_wait_q[0].list); INIT_LIST_HEAD(&info->e_wait_q[0].list);
INIT_LIST_HEAD(&info->e_wait_q[1].list); INIT_LIST_HEAD(&info->e_wait_q[1].list);
info->notify_owner = 0; info->notify.sigev_notify = SIGEV_NONE;
info->qsize = 0; info->qsize = 0;
memset(&info->attr, 0, sizeof(info->attr)); memset(&info->attr, 0, sizeof(info->attr));
info->attr.mq_maxmsg = DFLT_MSGMAX; info->attr.mq_maxmsg = DFLT_MSGMAX;
...@@ -283,10 +286,11 @@ static ssize_t mqueue_read_file(struct file *filp, char __user *u_data, ...@@ -283,10 +286,11 @@ static ssize_t mqueue_read_file(struct file *filp, char __user *u_data,
snprintf(buffer, sizeof(buffer), snprintf(buffer, sizeof(buffer),
"QSIZE:%-10lu NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n", "QSIZE:%-10lu NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n",
info->qsize, info->qsize,
info->notify_owner ? info->notify.sigev_notify : SIGEV_NONE, info->notify.sigev_notify,
(info->notify_owner && info->notify.sigev_notify == SIGEV_SIGNAL ) ? (info->notify.sigev_notify == SIGEV_SIGNAL ) ?
info->notify.sigev_signo : 0, info->notify.sigev_signo : 0,
info->notify_owner); (info->notify.sigev_notify != SIGEV_NONE) ?
info->notify_owner : 0);
spin_unlock(&info->lock); spin_unlock(&info->lock);
buffer[sizeof(buffer)-1] = '\0'; buffer[sizeof(buffer)-1] = '\0';
slen = strlen(buffer)+1; slen = strlen(buffer)+1;
...@@ -299,7 +303,7 @@ static ssize_t mqueue_read_file(struct file *filp, char __user *u_data, ...@@ -299,7 +303,7 @@ static ssize_t mqueue_read_file(struct file *filp, char __user *u_data,
count = slen - o; count = slen - o;
if (copy_to_user(u_data, buffer + o, count)) if (copy_to_user(u_data, buffer + o, count))
return -EFAULT; return -EFAULT;
*off = o + count; *off = o + count;
filp->f_dentry->d_inode->i_atime = filp->f_dentry->d_inode->i_ctime = CURRENT_TIME; filp->f_dentry->d_inode->i_atime = filp->f_dentry->d_inode->i_ctime = CURRENT_TIME;
...@@ -311,7 +315,8 @@ static int mqueue_flush_file(struct file *filp) ...@@ -311,7 +315,8 @@ static int mqueue_flush_file(struct file *filp)
struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode); struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode);
spin_lock(&info->lock); spin_lock(&info->lock);
if (current->tgid == info->notify_owner) if (info->notify.sigev_notify != SIGEV_NONE &&
current->tgid == info->notify_owner)
remove_notification(info); remove_notification(info);
spin_unlock(&info->lock); spin_unlock(&info->lock);
...@@ -435,6 +440,11 @@ static inline struct msg_msg *msg_get(struct mqueue_inode_info *info) ...@@ -435,6 +440,11 @@ static inline struct msg_msg *msg_get(struct mqueue_inode_info *info)
return info->messages[info->attr.mq_curmsgs]; return info->messages[info->attr.mq_curmsgs];
} }
static inline void set_cookie(struct sk_buff *skb, char code)
{
((char*)skb->data)[NOTIFY_COOKIE_LEN-1] = code;
}
/* /*
* The next function is only to split too long sys_mq_timedsend * The next function is only to split too long sys_mq_timedsend
*/ */
...@@ -445,7 +455,8 @@ static void __do_notify(struct mqueue_inode_info *info) ...@@ -445,7 +455,8 @@ static void __do_notify(struct mqueue_inode_info *info)
* waiting synchronously for message AND state of queue changed from * waiting synchronously for message AND state of queue changed from
* empty to not empty. Here we are sure that no one is waiting * empty to not empty. Here we are sure that no one is waiting
* synchronously. */ * synchronously. */
if (info->notify_owner && info->attr.mq_curmsgs == 1) { if (info->notify.sigev_notify != SIGEV_NONE &&
info->attr.mq_curmsgs == 1) {
/* sends signal */ /* sends signal */
if (info->notify.sigev_notify == SIGEV_SIGNAL) { if (info->notify.sigev_notify == SIGEV_SIGNAL) {
struct siginfo sig_i; struct siginfo sig_i;
...@@ -460,10 +471,12 @@ static void __do_notify(struct mqueue_inode_info *info) ...@@ -460,10 +471,12 @@ static void __do_notify(struct mqueue_inode_info *info)
kill_proc_info(info->notify.sigev_signo, kill_proc_info(info->notify.sigev_signo,
&sig_i, info->notify_owner); &sig_i, info->notify_owner);
} else if (info->notify.sigev_notify == SIGEV_THREAD) { } else if (info->notify.sigev_notify == SIGEV_THREAD) {
info->notify_filp->private_data = (void*)NP_WOKENUP; set_cookie(info->notify_cookie, NOTIFY_WOKENUP);
netlink_sendskb(info->notify_sock,
info->notify_cookie, 0);
} }
/* after notification unregisters process */ /* after notification unregisters process */
info->notify_owner = 0; info->notify.sigev_notify = SIGEV_NONE;
} }
wake_up(&info->wait_q); wake_up(&info->wait_q);
} }
...@@ -499,90 +512,13 @@ static long prepare_timeout(const struct timespec __user *u_arg) ...@@ -499,90 +512,13 @@ static long prepare_timeout(const struct timespec __user *u_arg)
return timeout; return timeout;
} }
/*
* File descriptor based notification, intended to be used to implement
* SIGEV_THREAD:
* SIGEV_THREAD means that a notification function should be called in the
* context of a new thread. The kernel can't do that. Therefore mq_notify
* calls with SIGEV_THREAD return a new file descriptor. A user space helper
* must create a new thread and then read from the given file descriptor.
* The read always returns one byte. If it's NOTIFY_WOKENUP, then it must
* call the notification function. If it's NOTIFY_REMOVED, then the
* notification was removed. The file descriptor supports poll, thus one
* supervisor thread can manage multiple message queue notifications.
*
* The implementation must support multiple outstanding notifications:
* It's possible that a new notification is added and signaled before user
* space calls mqueue_notify_read for the previous notification.
* Therefore the notification state is stored in the private_data field of
* the file descriptor.
*/
static unsigned int mqueue_notify_poll(struct file *filp,
struct poll_table_struct *poll_tab)
{
struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode);
int retval;
poll_wait(filp, &info->wait_q, poll_tab);
if (filp->private_data == NP_NONE)
retval = 0;
else
retval = POLLIN | POLLRDNORM;
return retval;
}
static ssize_t mqueue_notify_read(struct file *filp, char __user *buf,
size_t count, loff_t *ppos)
{
struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode);
char result;
if (!count)
return 0;
if (*ppos != 0)
return 0;
spin_lock(&info->lock);
while (filp->private_data == NP_NONE) {
DEFINE_WAIT(wait);
if (filp->f_flags & O_NONBLOCK) {
spin_unlock(&info->lock);
return -EAGAIN;
}
prepare_to_wait(&info->wait_q, &wait, TASK_INTERRUPTIBLE);
spin_unlock(&info->lock);
schedule();
finish_wait(&info->wait_q, &wait);
spin_lock(&info->lock);
}
spin_unlock(&info->lock);
result = (char)(unsigned long)filp->private_data;
if (put_user(result, buf))
return -EFAULT;
*ppos = 1;
return 1;
}
static int mqueue_notify_release(struct inode *inode, struct file *filp)
{
struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode);
spin_lock(&info->lock);
if (info->notify_owner && info->notify_filp == filp)
info->notify_owner = 0;
filp->private_data = NP_REMOVED;
spin_unlock(&info->lock);
return 0;
}
static void remove_notification(struct mqueue_inode_info *info) static void remove_notification(struct mqueue_inode_info *info)
{ {
if (info->notify.sigev_notify == SIGEV_THREAD) { if (info->notify.sigev_notify == SIGEV_THREAD) {
info->notify_filp->private_data = NP_REMOVED; set_cookie(info->notify_cookie, NOTIFY_REMOVED);
wake_up(&info->wait_q); netlink_sendskb(info->notify_sock, info->notify_cookie, 0);
} }
info->notify_owner = 0; info->notify.sigev_notify = SIGEV_NONE;
} }
/* /*
...@@ -780,7 +716,8 @@ asmlinkage long sys_mq_unlink(const char __user *u_name) ...@@ -780,7 +716,8 @@ asmlinkage long sys_mq_unlink(const char __user *u_name)
*/ */
/* pipelined_send() - send a message directly to the task waiting in /* pipelined_send() - send a message directly to the task waiting in
* sys_mq_timedreceive() (without inserting message into a queue). */ * sys_mq_timedreceive() (without inserting message into a queue).
*/
static inline void pipelined_send(struct mqueue_inode_info *info, static inline void pipelined_send(struct mqueue_inode_info *info,
struct msg_msg *message, struct msg_msg *message,
struct ext_wait_queue *receiver) struct ext_wait_queue *receiver)
...@@ -978,12 +915,16 @@ asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr, ...@@ -978,12 +915,16 @@ asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr,
asmlinkage long sys_mq_notify(mqd_t mqdes, asmlinkage long sys_mq_notify(mqd_t mqdes,
const struct sigevent __user *u_notification) const struct sigevent __user *u_notification)
{ {
int ret, fd; int ret;
struct file *filp, *nfilp; struct file *filp;
struct sock *sock;
struct inode *inode; struct inode *inode;
struct sigevent notification; struct sigevent notification;
struct mqueue_inode_info *info; struct mqueue_inode_info *info;
struct sk_buff *nc;
nc = NULL;
sock = NULL;
if (u_notification == NULL) { if (u_notification == NULL) {
notification.sigev_notify = SIGEV_NONE; notification.sigev_notify = SIGEV_NONE;
} else { } else {
...@@ -1000,6 +941,44 @@ asmlinkage long sys_mq_notify(mqd_t mqdes, ...@@ -1000,6 +941,44 @@ asmlinkage long sys_mq_notify(mqd_t mqdes,
notification.sigev_signo > _NSIG)) { notification.sigev_signo > _NSIG)) {
return -EINVAL; return -EINVAL;
} }
if (notification.sigev_notify == SIGEV_THREAD) {
/* create the notify skb */
nc = alloc_skb(NOTIFY_COOKIE_LEN, GFP_KERNEL);
ret = -ENOMEM;
if (!nc)
goto out;
ret = -EFAULT;
if (copy_from_user(nc->data,
notification.sigev_value.sival_ptr,
NOTIFY_COOKIE_LEN)) {
goto out;
}
/* TODO: add a header? */
skb_put(nc, NOTIFY_COOKIE_LEN);
/* and attach it to the socket */
retry:
filp = fget(notification.sigev_signo);
ret = -EBADF;
if (!filp)
goto out;
sock = netlink_getsockbyfilp(filp);
fput(filp);
if (IS_ERR(sock)) {
ret = PTR_ERR(sock);
sock = NULL;
goto out;
}
ret = netlink_attachskb(sock, nc, 0, MAX_SCHEDULE_TIMEOUT);
if (ret == 1)
goto retry;
if (ret) {
sock = NULL;
nc = NULL;
goto out;
}
}
} }
ret = -EBADF; ret = -EBADF;
...@@ -1013,47 +992,33 @@ asmlinkage long sys_mq_notify(mqd_t mqdes, ...@@ -1013,47 +992,33 @@ asmlinkage long sys_mq_notify(mqd_t mqdes,
info = MQUEUE_I(inode); info = MQUEUE_I(inode);
ret = 0; ret = 0;
if (notification.sigev_notify == SIGEV_THREAD) {
ret = get_unused_fd();
if (ret < 0)
goto out_fput;
fd = ret;
nfilp = get_empty_filp();
if (!nfilp) {
ret = -ENFILE;
goto out_dropfd;
}
nfilp->private_data = NP_NONE;
nfilp->f_op = &mqueue_notify_fops;
nfilp->f_vfsmnt = mntget(mqueue_mnt);
nfilp->f_dentry = dget(filp->f_dentry);
nfilp->f_mapping = filp->f_dentry->d_inode->i_mapping;
nfilp->f_flags = O_RDONLY;
nfilp->f_mode = FMODE_READ;
} else {
nfilp = NULL;
fd = -1;
}
spin_lock(&info->lock); spin_lock(&info->lock);
switch (notification.sigev_notify) {
if (notification.sigev_notify == SIGEV_NONE) { case SIGEV_NONE:
if (info->notify_owner == current->tgid) { if (info->notify.sigev_notify != SIGEV_NONE &&
info->notify_owner == current->tgid) {
remove_notification(info); remove_notification(info);
inode->i_atime = inode->i_ctime = CURRENT_TIME; inode->i_atime = inode->i_ctime = CURRENT_TIME;
} }
} else if (info->notify_owner) { break;
ret = -EBUSY; case SIGEV_THREAD:
} else if (notification.sigev_notify == SIGEV_THREAD) { if (info->notify.sigev_notify != SIGEV_NONE) {
info->notify_filp = nfilp; ret = -EBUSY;
fd_install(fd, nfilp); break;
ret = fd; }
fd = -1; info->notify_sock = sock;
nfilp = NULL; info->notify_cookie = nc;
sock = NULL;
nc = NULL;
info->notify.sigev_notify = SIGEV_THREAD; info->notify.sigev_notify = SIGEV_THREAD;
info->notify_owner = current->tgid; info->notify_owner = current->tgid;
inode->i_atime = inode->i_ctime = CURRENT_TIME; inode->i_atime = inode->i_ctime = CURRENT_TIME;
} else { break;
case SIGEV_SIGNAL:
if (info->notify.sigev_notify != SIGEV_NONE) {
ret = -EBUSY;
break;
}
info->notify.sigev_signo = notification.sigev_signo; info->notify.sigev_signo = notification.sigev_signo;
info->notify.sigev_value = notification.sigev_value; info->notify.sigev_value = notification.sigev_value;
info->notify.sigev_notify = SIGEV_SIGNAL; info->notify.sigev_notify = SIGEV_SIGNAL;
...@@ -1061,12 +1026,14 @@ asmlinkage long sys_mq_notify(mqd_t mqdes, ...@@ -1061,12 +1026,14 @@ asmlinkage long sys_mq_notify(mqd_t mqdes,
inode->i_atime = inode->i_ctime = CURRENT_TIME; inode->i_atime = inode->i_ctime = CURRENT_TIME;
} }
spin_unlock(&info->lock); spin_unlock(&info->lock);
out_dropfd:
if (fd != -1)
put_unused_fd(fd);
out_fput: out_fput:
fput(filp); fput(filp);
out: out:
if (sock) {
netlink_detachskb(sock, nc);
} else if (nc) {
dev_kfree_skb(nc);
}
return ret; return ret;
} }
...@@ -1135,13 +1102,6 @@ static struct file_operations mqueue_file_operations = { ...@@ -1135,13 +1102,6 @@ static struct file_operations mqueue_file_operations = {
.read = mqueue_read_file, .read = mqueue_read_file,
}; };
static struct file_operations mqueue_notify_fops = {
.poll = mqueue_notify_poll,
.read = mqueue_notify_read,
.release = mqueue_notify_release,
};
static struct super_operations mqueue_super_ops = { static struct super_operations mqueue_super_ops = {
.alloc_inode = mqueue_alloc_inode, .alloc_inode = mqueue_alloc_inode,
.destroy_inode = mqueue_destroy_inode, .destroy_inode = mqueue_destroy_inode,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment