Commit 8f8b7352 authored by Russ Cox's avatar Russ Cox

Native Client SRPC (simple RPC), both server and client.

R=r
DELTA=958  (958 added, 0 deleted, 0 changed)
OCL=35096
CL=35106
parent 5c2c57e5
# Copyright 2009 The Go Authors. All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.
include $(GOROOT)/src/Make.$(GOARCH)
TARG=nacl/srpc
GOFILES=\
client.go\
msg.go\
server.go\
include $(GOROOT)/src/Make.pkg
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// This package implements Native Client's simple RPC (SRPC).
package srpc
import (
"bytes";
"log";
"os";
"sync";
)
// A Client represents the client side of an SRPC connection.
type Client struct {
fd int; // fd to server
r msgReceiver;
s msgSender;
service map[string]srv; // services by name
out chan *msg; // send to out to write to connection
mu sync.Mutex; // protects pending, idGen
pending map[uint64]*RPC;
idGen uint64; // generator for request IDs
}
// A srv is a single method that the server offers.
type srv struct {
num uint32; // method number
fmt string; // argument format
}
// An RPC represents a single RPC issued by a client.
type RPC struct {
Ret []interface{}; // Return values
Done chan *RPC; // Channel where notification of done arrives
Errno Errno; // Status code
c *Client;
id uint64; // request id
}
// NewClient allocates a new client using the file descriptor fd.
func NewClient(fd int) (c *Client, err os.Error) {
c = new(Client);
c.fd = fd;
c.r.fd = fd;
c.s.fd = fd;
c.service = make(map[string]srv);
c.pending = make(map[uint64]*RPC);
// service discovery request
m := &msg{
protocol: protocol,
isReq: true,
Ret: []interface{}{ []byte(nil) },
Size: []int{ 4000 },
};
m.packRequest();
c.s.send(m);
m, err = c.r.recv();
if err != nil {
return nil, err;
}
m.unpackResponse();
if m.status != OK {
log.Stderrf("NewClient service_discovery: %s", m.status);
return nil, m.status;
}
for n, line := range bytes.Split(m.Ret[0].([]byte), []byte{'\n'}, 0) {
i := bytes.Index(line, []byte{':'});
if i < 0 {
continue;
}
c.service[string(line[0:i])] = srv{uint32(n), string(line[i+1:len(line)])};
}
c.out = make(chan *msg);
go c.input();
go c.output();
return c, nil;
}
func (c *Client) input() {
for {
m, err := c.r.recv();
if err != nil {
log.Exitf("client recv: %s", err);
}
if m.unpackResponse(); m.status != OK {
log.Stderrf("invalid message: %s", m.status);
continue;
}
c.mu.Lock();
rpc, ok := c.pending[m.requestId];
if ok {
c.pending[m.requestId] = nil, false;
}
c.mu.Unlock();
if !ok {
log.Stderrf("unexpected response");
continue;
}
rpc.Ret = m.Ret;
rpc.Done <- rpc;
}
}
func (c *Client) output() {
for m := range c.out {
c.s.send(m);
}
}
// NewRPC creates a new RPC on the client connection.
func (c *Client) NewRPC(done chan *RPC) *RPC {
if done == nil {
done = make(chan *RPC);
}
c.mu.Lock();
id := c.idGen;
c.idGen++;
c.mu.Unlock();
return &RPC{nil, done, OK, c, id};
}
// Start issues an RPC request for method name with the given arguments.
// The RPC r must not be in use for another pending request.
// To wait for the RPC to finish, receive from r.Done and then
// inspect r.Ret and r.Errno.
func (r *RPC) Start(name string, arg []interface{}) {
var m msg;
r.Errno = OK;
r.c.mu.Lock();
srv, ok := r.c.service[name];
if !ok {
r.c.mu.Unlock();
r.Errno = ErrBadRPCNumber;
r.Done <- r;
return;
}
r.c.pending[r.id] = r;
r.c.mu.Unlock();
m.protocol = protocol;
m.requestId = r.id;
m.isReq = true;
m.rpcNumber = srv.num;
m.Arg = arg;
// Fill in the return values and sizes to generate
// the right type chars. We'll take most any size.
// Skip over input arguments.
// We could check them against arg, but the server
// will do that anyway.
i := 0;
for srv.fmt[i] != ':' {
i++;
}
fmt := srv.fmt[i+1:len(srv.fmt)];
// Now the return prototypes.
m.Ret = make([]interface{}, len(fmt) - i);
m.Size = make([]int, len(fmt) - i);
for i := 0; i < len(fmt); i++ {
switch fmt[i] {
default:
log.Exitf("unexpected service type %c", fmt[i]);
case 'b':
m.Ret[i] = false;
case 'C':
m.Ret[i] = []byte(nil);
m.Size[i] = 1<<30;
case 'd':
m.Ret[i] = float64(0);
case 'D':
m.Ret[i] = []float64(nil);
m.Size[i] = 1<<30;
case 'h':
m.Ret[i] = int(-1);
case 'i':
m.Ret[i] = int32(0);
case 'I':
m.Ret[i] = []int32(nil);
m.Size[i] = 1<<30;
case 's':
m.Ret[i] = "";
m.Size[i] = 1<<30;
}
}
m.packRequest();
r.c.out <- &m;
}
// Call is a convenient wrapper that starts the RPC request,
// waits for it to finish, and then returns the results.
// Its implementation is:
//
// r.Start(name, arg);
// <-r.Done;
// return r.Ret, r.Errno;
//
func (r *RPC) Call(name string, arg []interface{}) (ret []interface{}, err Errno) {
r.Start(name, arg);
<-r.Done;
return r.Ret, r.Errno;
}
This diff is collapsed.
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// SRPC server
package srpc
import (
"bytes";
"log";
"os";
"syscall";
)
// TODO(rsc): I'd prefer to make this
// type Handler func(m *msg) Errno
// but NaCl can't use closures.
// The explicit interface is a way to attach state.
// A Handler is a handler for an SRPC method.
// It reads arguments from m.Arg, checks m.Size for array limits,
// writes return values to m.Ret, and returns an Errno status code.
type Handler interface {
Run(m *msg) Errno
}
type method struct {
name string;
fmt string;
handler Handler;
}
var rpcMethod []method
// BUG(rsc): Add's format string should be replaced by analyzing the
// type of an arbitrary func passed in an interface{} using reflection.
// Add registers a handler for the named method.
// Fmt is a Native Client format string, a sequence of
// alphabetic characters representing the types of the parameter values,
// a colon, and then a sequence of alphabetic characters
// representing the types of the returned values.
// The format characters and corresponding dynamic types are:
//
// b bool
// C []byte
// d float64
// D []float64
// h int // a file descriptor (aka handle)
// i int32
// I []int32
// s string
//
func Add(name, fmt string, handler Handler) {
n := len(rpcMethod);
if n >= cap(rpcMethod) {
a := make([]method, n, (n+4)*2);
for i := range a {
a[i] = rpcMethod[i];
}
rpcMethod = a;
}
rpcMethod = rpcMethod[0:n+1];
rpcMethod[n] = method{name, fmt, handler};
}
// Serve accepts new SRPC connections from the file descriptor fd
// and answers RPCs issued on those connections.
// It closes fd and returns an error if the imc_accept system call fails.
func Serve(fd int) os.Error {
defer syscall.Close(fd);
for {
cfd, _, e := syscall.Syscall(syscall.SYS_IMC_ACCEPT, uintptr(fd), 0, 0);
if e != 0 {
return os.NewSyscallError("imc_accept", int(e));
}
go serveLoop(int(cfd));
}
panic("unreachable");
}
func serveLoop(fd int) {
c := make(chan *msg);
go sendLoop(fd, c);
var r msgReceiver;
r.fd = fd;
for {
m, err := r.recv();
if err != nil {
break;
}
m.unpackRequest();
if !m.gotHeader {
log.Stderrf("cannot unpack header: %s", m.status);
continue;
}
// log.Stdoutf("<- %#v", m);
m.isReq = false; // set up for response
go serveMsg(m, c);
}
close(c);
}
func sendLoop(fd int, c <-chan *msg) {
var s msgSender;
s.fd = fd;
for m := range c {
// log.Stdoutf("-> %#v", m);
m.packResponse();
s.send(m);
}
syscall.Close(fd);
}
func serveMsg(m *msg, c chan<- *msg) {
if m.status != OK {
c <- m;
return;
}
if m.rpcNumber >= uint32(len(rpcMethod)) {
m.status = ErrBadRPCNumber;
c <- m;
return;
}
meth := &rpcMethod[m.rpcNumber];
if meth.fmt != m.fmt {
switch {
case len(m.fmt) < len(meth.fmt):
m.status = ErrTooFewArgs;
case len(m.fmt) > len(meth.fmt):
m.status = ErrTooManyArgs;
default:
// There's a type mismatch.
// It's an in-arg mismatch if the mismatch happens
// before the colon; otherwise it's an out-arg mismatch.
m.status = ErrInArgTypeMismatch;
for i := 0; i < len(m.fmt) && m.fmt[i] == meth.fmt[i]; i++ {
if m.fmt[i] == ':' {
m.status = ErrOutArgTypeMismatch;
break;
}
}
}
c <- m;
return;
}
m.status = meth.handler.Run(m);
c <- m;
}
// ServeRuntime serves RPCs issued by the Native Client embedded runtime.
// This should be called by main once all methods have been registered using Add.
func ServeRuntime() os.Error {
// Call getFd to check that we are running embedded.
if _, err := getFd(); err != nil {
return err;
}
// We are running embedded.
// The fd returned by getFd is a red herring.
// Accept connections on magic fd 3.
return Serve(3);
}
// getFd runs the srpc_get_fd system call.
func getFd() (fd int, err os.Error) {
r1, _, e := syscall.Syscall(syscall.SYS_SRPC_GET_FD, 0, 0, 0);
return int(r1), os.NewSyscallError("srpc_get_fd", int(e));
}
// Enabled returns true if SRPC is enabled in the Native Client runtime.
func Enabled() bool {
_, err:= getFd();
return err == nil;
}
// Service #0, service_discovery, returns a list of the other services
// and their argument formats.
type serviceDiscovery struct{}
func (serviceDiscovery) Run(m *msg) Errno {
var b bytes.Buffer;
for _, m := range rpcMethod {
b.WriteString(m.name);
b.WriteByte(':');
b.WriteString(m.fmt);
b.WriteByte('\n');
}
if b.Len() > m.Size[0] {
return ErrNoMemory;
}
m.Ret[0] = b.Bytes();
return OK;
}
func init() {
Add("service_discovery", ":C", serviceDiscovery{});
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment