Commit 0f16a419 authored by Rusty Russell's avatar Rusty Russell

ccan/io: io_set_finish()

Rather than insisting on supplying them on every call to io_new_conn().
Also, this way it can be changed on a connection.
Signed-off-by: default avatarRusty Russell <rusty@rustcorp.com.au>
parent e2ce04ea
...@@ -114,10 +114,8 @@ ...@@ -114,10 +114,8 @@
* sbuf.len = sizeof(sbuf.inbuf); * sbuf.len = sizeof(sbuf.inbuf);
* sbuf.reader = io_new_conn(STDIN_FILENO, * sbuf.reader = io_new_conn(STDIN_FILENO,
* io_read_partial(sbuf.inbuf, &sbuf.len, * io_read_partial(sbuf.inbuf, &sbuf.len,
* wake_writer, &sbuf), * wake_writer, &sbuf));
* reader_exit, &sbuf); * sbuf.writer = io_new_conn(tochild[1], io_idle());
* sbuf.writer = io_new_conn(tochild[1], io_idle(), fail_child_write,
* &sbuf);
* *
* out.max = 128; * out.max = 128;
* out.off = 0; * out.off = 0;
...@@ -125,11 +123,13 @@ ...@@ -125,11 +123,13 @@
* out.buf = malloc(out.max); * out.buf = malloc(out.max);
* from_child = io_new_conn(fromchild[0], * from_child = io_new_conn(fromchild[0],
* io_read_partial(out.buf, &out.rlen, * io_read_partial(out.buf, &out.rlen,
* read_from_child, &out), * read_from_child, &out));
* NULL, NULL);
* if (!sbuf.reader || !sbuf.writer || !from_child) * if (!sbuf.reader || !sbuf.writer || !from_child)
* err(1, "Allocating connections"); * err(1, "Allocating connections");
* *
* io_set_finish(sbuf.reader, reader_exit, &sbuf);
* io_set_finish(sbuf.writer, fail_child_write, &sbuf);
*
* io_loop(); * io_loop();
* wait(&status); * wait(&status);
* *
......
...@@ -152,12 +152,11 @@ int main(int argc, char *argv[]) ...@@ -152,12 +152,11 @@ int main(int argc, char *argv[])
/* For efficiency, we share client structure */ /* For efficiency, we share client structure */
io_new_conn(ret, io_new_conn(ret,
io_read(client.request_buffer, REQUEST_SIZE, io_read(client.request_buffer, REQUEST_SIZE,
write_reply, &client), write_reply, &client));
NULL, NULL);
} }
} }
io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL); io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
close(wake[0]); close(wake[0]);
for (i = 0; i < NUM_CHILDREN; i++) for (i = 0; i < NUM_CHILDREN; i++)
......
...@@ -157,11 +157,11 @@ int main(int argc, char *argv[]) ...@@ -157,11 +157,11 @@ int main(int argc, char *argv[])
err(1, "Accepting fd"); err(1, "Accepting fd");
/* For efficiency, we share buffer */ /* For efficiency, we share buffer */
client->request_buffer = buffer; client->request_buffer = buffer;
io_new_conn(ret, io_read_header(client), NULL, NULL); io_new_conn(ret, io_read_header(client));
} }
} }
io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf), NULL, NULL); io_new_conn(timeout[0], io_read(&buf, 1, do_timeout, &buf));
close(wake[0]); close(wake[0]);
for (i = 0; i < NUM_CHILDREN; i++) for (i = 0; i < NUM_CHILDREN; i++)
......
...@@ -66,14 +66,13 @@ int main(void) ...@@ -66,14 +66,13 @@ int main(void)
memset(buf[i].buf, i, sizeof(buf[i].buf)); memset(buf[i].buf, i, sizeof(buf[i].buf));
sprintf(buf[i].buf, "%i-%i", i, i); sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL); buf[i].reader = io_new_conn(last_read, io_idle());
if (!buf[i].reader) if (!buf[i].reader)
err(1, "Creating reader %i", i); err(1, "Creating reader %i", i);
buf[i].writer = io_new_conn(fds[1], buf[i].writer = io_new_conn(fds[1],
io_write(&buf[i].buf, io_write(&buf[i].buf,
sizeof(buf[i].buf), sizeof(buf[i].buf),
poke_reader, &buf[i]), poke_reader, &buf[i]));
NULL, NULL);
if (!buf[i].writer) if (!buf[i].writer)
err(1, "Creating writer %i", i); err(1, "Creating writer %i", i);
last_read = fds[0]; last_read = fds[0];
...@@ -83,13 +82,12 @@ int main(void) ...@@ -83,13 +82,12 @@ int main(void)
i = 0; i = 0;
buf[i].iters = 0; buf[i].iters = 0;
sprintf(buf[i].buf, "%i-%i", i, i); sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL); buf[i].reader = io_new_conn(last_read, io_idle());
if (!buf[i].reader) if (!buf[i].reader)
err(1, "Creating reader %i", i); err(1, "Creating reader %i", i);
buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf, buf[i].writer = io_new_conn(last_write, io_write(&buf[i].buf,
sizeof(buf[i].buf), sizeof(buf[i].buf),
poke_reader, &buf[i]), poke_reader, &buf[i]));
NULL, NULL);
if (!buf[i].writer) if (!buf[i].writer)
err(1, "Creating writer %i", i); err(1, "Creating writer %i", i);
......
...@@ -78,10 +78,7 @@ void io_close_listener(struct io_listener *l) ...@@ -78,10 +78,7 @@ void io_close_listener(struct io_listener *l)
free(l); free(l);
} }
struct io_conn *io_new_conn_(int fd, struct io_conn *io_new_conn_(int fd, struct io_plan plan)
struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg)
{ {
struct io_conn *conn = malloc(sizeof(*conn)); struct io_conn *conn = malloc(sizeof(*conn));
...@@ -91,8 +88,8 @@ struct io_conn *io_new_conn_(int fd, ...@@ -91,8 +88,8 @@ struct io_conn *io_new_conn_(int fd,
conn->fd.listener = false; conn->fd.listener = false;
conn->fd.fd = fd; conn->fd.fd = fd;
conn->plan = plan; conn->plan = plan;
conn->finish = finish; conn->finish = NULL;
conn->finish_arg = arg; conn->finish_arg = NULL;
conn->duplex = NULL; conn->duplex = NULL;
conn->timeout = NULL; conn->timeout = NULL;
if (!add_conn(conn)) { if (!add_conn(conn)) {
...@@ -102,10 +99,15 @@ struct io_conn *io_new_conn_(int fd, ...@@ -102,10 +99,15 @@ struct io_conn *io_new_conn_(int fd,
return conn; return conn;
} }
struct io_conn *io_duplex_(struct io_conn *old, void io_set_finish_(struct io_conn *conn,
struct io_plan plan,
void (*finish)(struct io_conn *, void *), void (*finish)(struct io_conn *, void *),
void *arg) void *arg)
{
conn->finish = finish;
conn->finish_arg = arg;
}
struct io_conn *io_duplex_(struct io_conn *old, struct io_plan plan)
{ {
struct io_conn *conn; struct io_conn *conn;
...@@ -119,8 +121,8 @@ struct io_conn *io_duplex_(struct io_conn *old, ...@@ -119,8 +121,8 @@ struct io_conn *io_duplex_(struct io_conn *old,
conn->fd.fd = old->fd.fd; conn->fd.fd = old->fd.fd;
conn->plan = plan; conn->plan = plan;
conn->duplex = old; conn->duplex = old;
conn->finish = finish; conn->finish = NULL;
conn->finish_arg = arg; conn->finish_arg = NULL;
conn->timeout = NULL; conn->timeout = NULL;
if (!add_duplex(conn)) { if (!add_duplex(conn)) {
free(conn); free(conn);
......
...@@ -67,23 +67,32 @@ static inline void io_plan_debug(struct io_plan *plan) { } ...@@ -67,23 +67,32 @@ static inline void io_plan_debug(struct io_plan *plan) { }
* io_new_conn - create a new connection. * io_new_conn - create a new connection.
* @fd: the file descriptor. * @fd: the file descriptor.
* @plan: the first I/O function. * @plan: the first I/O function.
* @finish: the function to call when it's closed or fails.
* @arg: the argument to @finish.
* *
* This creates a connection which owns @fd. @plan will be called on the * This creates a connection which owns @fd. @plan will be called on the
* next io_loop(), and @finish will be called when an I/O operation * next io_loop().
* fails, or you call io_close() on the connection.
* *
* Returns NULL on error (and sets errno). * Returns NULL on error (and sets errno).
*/ */
#define io_new_conn(fd, plan, finish, arg) \ #define io_new_conn(fd, plan) \
(io_plan_other(), io_new_conn_((fd), (plan), \ (io_plan_other(), io_new_conn_((fd), (plan)))
struct io_conn *io_new_conn_(int fd, struct io_plan plan);
/**
* io_set_finish - set finish function on a connection.
* @conn: the connection.
* @finish: the function to call when it's closed or fails.
* @arg: the argument to @finish.
*
* @finish will be called when an I/O operation fails, or you call
* io_close() on the connection.
*/
#define io_set_finish(conn, finish, arg) \
io_set_finish_((conn), \
typesafe_cb_preargs(void, void *, \ typesafe_cb_preargs(void, void *, \
(finish), (arg), \ (finish), (arg), \
struct io_conn *), \ struct io_conn *), \
(arg))) (arg))
struct io_conn *io_new_conn_(int fd, void io_set_finish_(struct io_conn *conn,
struct io_plan plan,
void (*finish)(struct io_conn *, void *), void (*finish)(struct io_conn *, void *),
void *arg); void *arg);
...@@ -241,8 +250,6 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts, ...@@ -241,8 +250,6 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
* io_duplex - split an fd into two connections. * io_duplex - split an fd into two connections.
* @conn: a connection. * @conn: a connection.
* @plan: the first I/O function to call. * @plan: the first I/O function to call.
* @finish: the function to call when it's closed or fails.
* @arg: the argument to @finish.
* *
* Sometimes you want to be able to simultaneously read and write on a * Sometimes you want to be able to simultaneously read and write on a
* single fd, but io forces a linear call sequence. The solition is * single fd, but io forces a linear call sequence. The solition is
...@@ -251,17 +258,10 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts, ...@@ -251,17 +258,10 @@ bool io_timeout_(struct io_conn *conn, struct timespec ts,
* *
* You must io_close() both of them to close the fd. * You must io_close() both of them to close the fd.
*/ */
#define io_duplex(conn, plan, finish, arg) \ #define io_duplex(conn, plan) \
(io_plan_other(), io_duplex_((conn), (plan), \ (io_plan_other(), io_duplex_((conn), (plan)))
typesafe_cb_preargs(void, void *, \
(finish), (arg), \
struct io_conn *), \
(arg)))
struct io_conn *io_duplex_(struct io_conn *conn, struct io_conn *io_duplex_(struct io_conn *conn, struct io_plan plan);
struct io_plan plan,
void (*finish)(struct io_conn *, void *),
void *arg);
/** /**
* io_wake - wake up an idle connection. * io_wake - wake up an idle connection.
......
...@@ -21,8 +21,8 @@ static void init_conn(int fd, int *state) ...@@ -21,8 +21,8 @@ static void init_conn(int fd, int *state)
{ {
ok1(*state == 0); ok1(*state == 0);
(*state)++; (*state)++;
if (!io_new_conn(fd, io_close(NULL, NULL), finish_ok, state)) io_set_finish(io_new_conn(fd, io_close(NULL, NULL)), finish_ok, state);
abort();
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
......
...@@ -27,9 +27,9 @@ static void init_conn(int fd, struct data *d) ...@@ -27,9 +27,9 @@ static void init_conn(int fd, struct data *d)
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
if (!io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d), io_set_finish(io_new_conn(fd,
finish_ok, d)) io_read(d->buf, sizeof(d->buf), io_close, d)),
abort(); finish_ok, d);
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
......
...@@ -29,9 +29,9 @@ static void init_conn(int fd, struct data *d) ...@@ -29,9 +29,9 @@ static void init_conn(int fd, struct data *d)
d->state++; d->state++;
d->bytes = sizeof(d->buf); d->bytes = sizeof(d->buf);
if (!io_new_conn(fd, io_read_partial(d->buf, &d->bytes, io_close, d), io_set_finish(io_new_conn(fd,
finish_ok, d)) io_read_partial(d->buf, &d->bytes, io_close, d)),
abort(); finish_ok, d);
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
......
...@@ -27,9 +27,9 @@ static void init_conn(int fd, struct data *d) ...@@ -27,9 +27,9 @@ static void init_conn(int fd, struct data *d)
{ {
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
if (!io_new_conn(fd, io_write_partial(d->buf, &d->bytes, io_close, d), io_set_finish(io_new_conn(fd,
finish_ok, d)) io_write_partial(d->buf, &d->bytes, io_close, d)),
abort(); finish_ok, d);
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
......
...@@ -27,9 +27,8 @@ static void init_conn(int fd, struct data *d) ...@@ -27,9 +27,8 @@ static void init_conn(int fd, struct data *d)
{ {
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
if (!io_new_conn(fd, io_write(d->buf, d->bytes, io_close, d), io_set_finish(io_new_conn(fd, io_write(d->buf, d->bytes, io_close, d)),
finish_ok, d)) finish_ok, d);
abort();
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
......
...@@ -52,12 +52,14 @@ static void init_conn(int fd, struct data *d) ...@@ -52,12 +52,14 @@ static void init_conn(int fd, struct data *d)
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
idler = io_new_conn(fd, io_idle(), finish_idle, d); idler = io_new_conn(fd, io_idle());
io_set_finish(idler, finish_idle, d);
/* This will wake us up, as read will fail. */ /* This will wake us up, as read will fail. */
fd2 = open("/dev/null", O_RDONLY); fd2 = open("/dev/null", O_RDONLY);
ok1(fd2 >= 0); ok1(fd2 >= 0);
ok1(io_new_conn(fd2, io_read(idler, 1, never, NULL), finish_waker, d)); io_set_finish(io_new_conn(fd2, io_read(idler, 1, never, NULL)),
finish_waker, d);
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
...@@ -100,7 +102,7 @@ int main(void) ...@@ -100,7 +102,7 @@ int main(void)
int fd, status; int fd, status;
/* This is how many tests you plan to run */ /* This is how many tests you plan to run */
plan_tests(14); plan_tests(13);
d->state = 0; d->state = 0;
fd = make_listen_fd(PORT, &addrinfo); fd = make_listen_fd(PORT, &addrinfo);
ok1(fd >= 0); ok1(fd >= 0);
......
...@@ -33,11 +33,10 @@ static void init_conn(int fd, struct data *d) ...@@ -33,11 +33,10 @@ static void init_conn(int fd, struct data *d)
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
if (!io_new_conn(fd, io_set_finish(io_new_conn(fd,
io_break(d, io_break(d,
io_read(d->buf, sizeof(d->buf), read_done, d)), io_read(d->buf, sizeof(d->buf), read_done, d))),
finish_ok, d)) finish_ok, d);
abort();
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
......
...@@ -26,13 +26,12 @@ int main(void) ...@@ -26,13 +26,12 @@ int main(void)
ok1(pipe(fds) == 0); ok1(pipe(fds) == 0);
/* Write then close. */ /* Write then close. */
io_new_conn(fds[1], io_write("hello there world", 16, io_close, NULL), io_new_conn(fds[1], io_write("hello there world", 16, io_close, NULL));
NULL, NULL); conn = io_new_conn(fds[0], io_idle());
conn = io_new_conn(fds[0], io_idle(), NULL, NULL);
/* To avoid assert(num_waiting) */ /* To avoid assert(num_waiting) */
ok1(pipe(fds2) == 0); ok1(pipe(fds2) == 0);
io_new_conn(fds2[0], io_read(buf, 16, io_close, NULL), NULL, NULL); io_new_conn(fds2[0], io_read(buf, 16, io_close, NULL));
/* After half a second, it will read. */ /* After half a second, it will read. */
io_timeout(conn, time_from_msec(500), timeout_wakeup, buf); io_timeout(conn, time_from_msec(500), timeout_wakeup, buf);
......
...@@ -23,9 +23,8 @@ int main(void) ...@@ -23,9 +23,8 @@ int main(void)
plan_tests(3); plan_tests(3);
ok1(pipe(fds) == 0); ok1(pipe(fds) == 0);
conn = io_new_conn(fds[0], io_idle(), NULL, NULL); conn = io_new_conn(fds[0], io_idle());
io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn), io_new_conn(fds[1], io_write("EASYTEST", 8, wake_it, conn));
NULL, NULL);
ok1(io_loop() == NULL); ok1(io_loop() == NULL);
ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0); ok1(memcmp(inbuf, "EASYTEST", sizeof(inbuf)) == 0);
......
...@@ -66,14 +66,13 @@ int main(void) ...@@ -66,14 +66,13 @@ int main(void)
sprintf(buf[i].buf, "%i-%i", i, i); sprintf(buf[i].buf, "%i-%i", i, i);
/* Wait for writer to tell us to read. */ /* Wait for writer to tell us to read. */
buf[i].reader = io_new_conn(last_read, io_idle(), NULL, &buf[i]); buf[i].reader = io_new_conn(last_read, io_idle());
if (!buf[i].reader) if (!buf[i].reader)
break; break;
buf[i].writer = io_new_conn(fds[1], buf[i].writer = io_new_conn(fds[1],
io_write(&buf[i].buf, io_write(&buf[i].buf,
sizeof(buf[i].buf), sizeof(buf[i].buf),
poke_reader, &buf[i]), poke_reader, &buf[i]));
NULL, &buf[i]);
if (!buf[i].writer) if (!buf[i].writer)
break; break;
last_read = fds[0]; last_read = fds[0];
...@@ -84,12 +83,11 @@ int main(void) ...@@ -84,12 +83,11 @@ int main(void)
/* Last one completes the cirle. */ /* Last one completes the cirle. */
i = 0; i = 0;
sprintf(buf[i].buf, "%i-%i", i, i); sprintf(buf[i].buf, "%i-%i", i, i);
buf[i].reader = io_new_conn(last_read, io_idle(), NULL, NULL); buf[i].reader = io_new_conn(last_read, io_idle());
ok1(buf[i].reader); ok1(buf[i].reader);
buf[i].writer = io_new_conn(last_write, buf[i].writer = io_new_conn(last_write,
io_write(&buf[i].buf, sizeof(buf[i].buf), io_write(&buf[i].buf, sizeof(buf[i].buf),
poke_reader, &buf[i]), poke_reader, &buf[i]));
NULL, NULL);
ok1(buf[i].writer); ok1(buf[i].writer);
/* They should eventually exit */ /* They should eventually exit */
......
...@@ -39,10 +39,11 @@ static void init_conn(int fd, struct data *d) ...@@ -39,10 +39,11 @@ static void init_conn(int fd, struct data *d)
memset(d->wbuf, 7, sizeof(d->wbuf)); memset(d->wbuf, 7, sizeof(d->wbuf));
conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d), conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), io_close, d));
finish_ok, d); io_set_finish(conn, finish_ok, d);
ok1(io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d), conn = io_duplex(conn, io_write(d->wbuf, sizeof(d->wbuf), write_done, d));
finish_ok, d)); ok1(conn);
io_set_finish(conn, finish_ok, d);
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
......
...@@ -17,7 +17,7 @@ int main(void) ...@@ -17,7 +17,7 @@ int main(void)
int fds[2]; int fds[2];
ok1(pipe(fds) == 0); ok1(pipe(fds) == 0);
io_new_conn(fds[0], io_idle(), NULL, NULL); io_new_conn(fds[0], io_idle());
io_loop(); io_loop();
exit(1); exit(1);
} }
......
...@@ -48,8 +48,8 @@ static void init_conn(int fd, struct data *d) ...@@ -48,8 +48,8 @@ static void init_conn(int fd, struct data *d)
ok1(d->state == 0); ok1(d->state == 0);
d->state++; d->state++;
conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d), conn = io_new_conn(fd, io_read(d->buf, sizeof(d->buf), no_timeout, d));
finish_ok, d); io_set_finish(conn, finish_ok, d);
io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d); io_timeout(conn, time_from_usec(d->timeout_usec), timeout, d);
} }
......
...@@ -92,8 +92,8 @@ static void init_conn(int fd, struct packet *pkt) ...@@ -92,8 +92,8 @@ static void init_conn(int fd, struct packet *pkt)
ok1(pkt->state == 0); ok1(pkt->state == 0);
pkt->state++; pkt->state++;
if (!io_new_conn(fd, io_read_packet(pkt, io_close, pkt), finish_ok, pkt)) io_set_finish(io_new_conn(fd, io_read_packet(pkt, io_close, pkt)),
abort(); finish_ok, pkt);
} }
static int make_listen_fd(const char *port, struct addrinfo **info) static int make_listen_fd(const char *port, struct addrinfo **info)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment