Commit 6c64bf64 authored by Linus Torvalds's avatar Linus Torvalds

Make pipe data structure be a circular list of pages, rather than

a circular list of one page.

This improves pipe throughput, and allows us to (eventually)
use these lists of page buffers for moving data around efficiently.
parent 87475e1f
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
#include <linux/mount.h> #include <linux/mount.h>
#include <linux/pipe_fs_i.h> #include <linux/pipe_fs_i.h>
#include <linux/uio.h> #include <linux/uio.h>
#include <linux/highmem.h>
#include <asm/uaccess.h> #include <asm/uaccess.h>
#include <asm/ioctls.h> #include <asm/ioctls.h>
...@@ -89,6 +91,7 @@ pipe_readv(struct file *filp, const struct iovec *_iov, ...@@ -89,6 +91,7 @@ pipe_readv(struct file *filp, const struct iovec *_iov,
unsigned long nr_segs, loff_t *ppos) unsigned long nr_segs, loff_t *ppos)
{ {
struct inode *inode = filp->f_dentry->d_inode; struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *info;
int do_wakeup; int do_wakeup;
ssize_t ret; ssize_t ret;
struct iovec *iov = (struct iovec *)_iov; struct iovec *iov = (struct iovec *)_iov;
...@@ -102,32 +105,40 @@ pipe_readv(struct file *filp, const struct iovec *_iov, ...@@ -102,32 +105,40 @@ pipe_readv(struct file *filp, const struct iovec *_iov,
do_wakeup = 0; do_wakeup = 0;
ret = 0; ret = 0;
down(PIPE_SEM(*inode)); down(PIPE_SEM(*inode));
info = inode->i_pipe;
for (;;) { for (;;) {
int size = PIPE_LEN(*inode); int bufs = info->nrbufs;
if (size) { if (bufs) {
char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode); int curbuf = info->curbuf;
ssize_t chars = PIPE_MAX_RCHUNK(*inode); struct pipe_buffer *buf = info->bufs + curbuf;
size_t chars = buf->len;
int error;
if (chars > total_len) if (chars > total_len)
chars = total_len; chars = total_len;
if (chars > size)
chars = size;
if (pipe_iov_copy_to_user(iov, pipebuf, chars)) { error = pipe_iov_copy_to_user(iov, kmap(buf->page) + buf->offset, chars);
kunmap(buf->page);
if (unlikely(error)) {
if (!ret) ret = -EFAULT; if (!ret) ret = -EFAULT;
break; break;
} }
ret += chars; ret += chars;
buf->offset += chars;
PIPE_START(*inode) += chars; buf->len -= chars;
PIPE_START(*inode) &= (PIPE_SIZE - 1); if (!buf->len) {
PIPE_LEN(*inode) -= chars; __free_page(buf->page);
buf->page = NULL;
curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
info->curbuf = curbuf;
info->nrbufs = --bufs;
do_wakeup = 1;
}
total_len -= chars; total_len -= chars;
do_wakeup = 1;
if (!total_len) if (!total_len)
break; /* common path: read succeeded */ break; /* common path: read succeeded */
} }
if (PIPE_LEN(*inode)) /* test for cyclic buffers */ if (bufs) /* More to do? */
continue; continue;
if (!PIPE_WRITERS(*inode)) if (!PIPE_WRITERS(*inode))
break; break;
...@@ -177,8 +188,8 @@ pipe_writev(struct file *filp, const struct iovec *_iov, ...@@ -177,8 +188,8 @@ pipe_writev(struct file *filp, const struct iovec *_iov,
unsigned long nr_segs, loff_t *ppos) unsigned long nr_segs, loff_t *ppos)
{ {
struct inode *inode = filp->f_dentry->d_inode; struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *info;
ssize_t ret; ssize_t ret;
size_t min;
int do_wakeup; int do_wakeup;
struct iovec *iov = (struct iovec *)_iov; struct iovec *iov = (struct iovec *)_iov;
size_t total_len; size_t total_len;
...@@ -190,48 +201,58 @@ pipe_writev(struct file *filp, const struct iovec *_iov, ...@@ -190,48 +201,58 @@ pipe_writev(struct file *filp, const struct iovec *_iov,
do_wakeup = 0; do_wakeup = 0;
ret = 0; ret = 0;
min = total_len;
if (min > PIPE_BUF)
min = 1;
down(PIPE_SEM(*inode)); down(PIPE_SEM(*inode));
info = inode->i_pipe;
for (;;) { for (;;) {
int free; int bufs;
if (!PIPE_READERS(*inode)) { if (!PIPE_READERS(*inode)) {
send_sig(SIGPIPE, current, 0); send_sig(SIGPIPE, current, 0);
if (!ret) ret = -EPIPE; if (!ret) ret = -EPIPE;
break; break;
} }
free = PIPE_FREE(*inode); bufs = info->nrbufs;
if (free >= min) { if (bufs < PIPE_BUFFERS) {
/* transfer data */ ssize_t chars;
ssize_t chars = PIPE_MAX_WCHUNK(*inode); int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1);
char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode); struct pipe_buffer *buf = info->bufs + newbuf;
struct page *page = alloc_page(GFP_USER);
int error;
if (unlikely(!page)) {
ret = ret ? : -ENOMEM;
break;
}
/* Always wakeup, even if the copy fails. Otherwise /* Always wakeup, even if the copy fails. Otherwise
* we lock up (O_NONBLOCK-)readers that sleep due to * we lock up (O_NONBLOCK-)readers that sleep due to
* syscall merging. * syscall merging.
* FIXME! Is this really true?
*/ */
do_wakeup = 1; do_wakeup = 1;
chars = PAGE_SIZE;
if (chars > total_len) if (chars > total_len)
chars = total_len; chars = total_len;
if (chars > free)
chars = free;
if (pipe_iov_copy_from_user(pipebuf, iov, chars)) { error = pipe_iov_copy_from_user(kmap(page), iov, chars);
kunmap(page);
if (unlikely(error)) {
if (!ret) ret = -EFAULT; if (!ret) ret = -EFAULT;
__free_page(page);
break; break;
} }
ret += chars; ret += chars;
PIPE_LEN(*inode) += chars; /* Insert it into the buffer array */
buf->page = page;
buf->offset = 0;
buf->len = chars;
info->nrbufs = ++bufs;
total_len -= chars; total_len -= chars;
if (!total_len) if (!total_len)
break; break;
} }
if (PIPE_FREE(*inode) && ret) { if (bufs < PIPE_BUFFERS)
/* handle cyclic data buffers */
min = 1;
continue; continue;
}
if (filp->f_flags & O_NONBLOCK) { if (filp->f_flags & O_NONBLOCK) {
if (!ret) ret = -EAGAIN; if (!ret) ret = -EAGAIN;
break; break;
...@@ -283,9 +304,23 @@ static int ...@@ -283,9 +304,23 @@ static int
pipe_ioctl(struct inode *pino, struct file *filp, pipe_ioctl(struct inode *pino, struct file *filp,
unsigned int cmd, unsigned long arg) unsigned int cmd, unsigned long arg)
{ {
struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *info;
int count, buf, nrbufs;
switch (cmd) { switch (cmd) {
case FIONREAD: case FIONREAD:
return put_user(PIPE_LEN(*pino), (int __user *)arg); down(PIPE_SEM(*inode));
info = inode->i_pipe;
count = 0;
buf = info->curbuf;
nrbufs = info->nrbufs;
while (--nrbufs >= 0) {
count += info->bufs[buf].len;
buf = (buf+1) & (PIPE_BUFFERS-1);
}
up(PIPE_SEM(*inode));
return put_user(count, (int __user *)arg);
default: default:
return -EINVAL; return -EINVAL;
} }
...@@ -297,13 +332,16 @@ pipe_poll(struct file *filp, poll_table *wait) ...@@ -297,13 +332,16 @@ pipe_poll(struct file *filp, poll_table *wait)
{ {
unsigned int mask; unsigned int mask;
struct inode *inode = filp->f_dentry->d_inode; struct inode *inode = filp->f_dentry->d_inode;
struct pipe_inode_info *info = inode->i_pipe;
int nrbufs;
poll_wait(filp, PIPE_WAIT(*inode), wait); poll_wait(filp, PIPE_WAIT(*inode), wait);
/* Reading only -- no need for acquiring the semaphore. */ /* Reading only -- no need for acquiring the semaphore. */
mask = POLLIN | POLLRDNORM; nrbufs = info->nrbufs;
if (PIPE_EMPTY(*inode)) mask = (nrbufs > 0) ? POLLIN | POLLRDNORM : 0;
mask = POLLOUT | POLLWRNORM; mask |= (nrbufs < PIPE_BUFFERS) ? POLLOUT | POLLWRNORM : 0;
if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode)) if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode))
mask |= POLLHUP; mask |= POLLHUP;
if (!PIPE_READERS(*inode)) if (!PIPE_READERS(*inode))
...@@ -529,31 +567,37 @@ struct file_operations rdwr_pipe_fops = { ...@@ -529,31 +567,37 @@ struct file_operations rdwr_pipe_fops = {
void free_pipe_info(struct inode *inode) void free_pipe_info(struct inode *inode)
{ {
int i;
struct pipe_inode_info *info = inode->i_pipe; struct pipe_inode_info *info = inode->i_pipe;
inode->i_pipe = NULL; inode->i_pipe = NULL;
free_page((unsigned long)info->base); for (i = 0; i < PIPE_BUFFERS; i++) {
struct page *page = info->bufs[i].page;
/* We'll make this a data-dependent free some day .. */
if (page)
__free_page(page);
}
kfree(info); kfree(info);
} }
struct inode* pipe_new(struct inode* inode) struct inode* pipe_new(struct inode* inode)
{ {
unsigned long page; unsigned long page;
struct pipe_inode_info *info;
page = __get_free_page(GFP_USER); page = __get_free_page(GFP_USER);
if (!page) if (!page)
return NULL; return NULL;
inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL); info = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);
if (!inode->i_pipe) if (!info)
goto fail_page; goto fail_page;
memset(info, 0, sizeof(*info));
inode->i_pipe = info;
init_waitqueue_head(PIPE_WAIT(*inode)); init_waitqueue_head(PIPE_WAIT(*inode));
PIPE_BASE(*inode) = (char*) page;
PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
PIPE_WAITING_WRITERS(*inode) = 0;
PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1; PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;
*PIPE_FASYNC_READERS(*inode) = *PIPE_FASYNC_WRITERS(*inode) = NULL;
return inode; return inode;
fail_page: fail_page:
......
...@@ -2,10 +2,18 @@ ...@@ -2,10 +2,18 @@
#define _LINUX_PIPE_FS_I_H #define _LINUX_PIPE_FS_I_H
#define PIPEFS_MAGIC 0x50495045 #define PIPEFS_MAGIC 0x50495045
#define PIPE_BUFFERS (16)
struct pipe_buffer {
struct page *page;
unsigned short offset, len;
};
struct pipe_inode_info { struct pipe_inode_info {
wait_queue_head_t wait; wait_queue_head_t wait;
char *base; unsigned int nrbufs, curbuf;
unsigned int len; struct pipe_buffer bufs[PIPE_BUFFERS];
unsigned int start; unsigned int start;
unsigned int readers; unsigned int readers;
unsigned int writers; unsigned int writers;
...@@ -33,13 +41,6 @@ struct pipe_inode_info { ...@@ -33,13 +41,6 @@ struct pipe_inode_info {
#define PIPE_FASYNC_READERS(inode) (&((inode).i_pipe->fasync_readers)) #define PIPE_FASYNC_READERS(inode) (&((inode).i_pipe->fasync_readers))
#define PIPE_FASYNC_WRITERS(inode) (&((inode).i_pipe->fasync_writers)) #define PIPE_FASYNC_WRITERS(inode) (&((inode).i_pipe->fasync_writers))
#define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0)
#define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE)
#define PIPE_FREE(inode) (PIPE_SIZE - PIPE_LEN(inode))
#define PIPE_END(inode) ((PIPE_START(inode) + PIPE_LEN(inode)) & (PIPE_SIZE-1))
#define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode))
#define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode))
/* Drop the inode semaphore and wait for a pipe event, atomically */ /* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct inode * inode); void pipe_wait(struct inode * inode);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment