Commit 0fa03444 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f52053c4
...@@ -70,8 +70,6 @@ type Addr struct { ...@@ -70,8 +70,6 @@ type Addr struct {
addr string // XXX -> port ? + including c/s ? addr string // XXX -> port ? + including c/s ?
} }
var _ net.Addr = (*Addr)(nil)
func (a *Addr) Network() string { return a.network } func (a *Addr) Network() string { return a.network }
func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr ? func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr ?
func (n *Network) netname() string { return NetPrefix + n.Name } func (n *Network) netname() string { return NetPrefix + n.Name }
...@@ -120,32 +118,26 @@ func (n *Network) Listen(laddr string) (net.Listener, error) { ...@@ -120,32 +118,26 @@ func (n *Network) Listen(laddr string) (net.Listener, error) {
network: n, network: n,
port: port, port: port,
dialq: make(chan chan net.Conn), dialq: make(chan chan net.Conn),
// acceptq: make(chan chan connected),
down: make(chan struct{}), down: make(chan struct{}),
} }
n.pipev[port] = &pipe{listener: l} n.pipev[port] = &pipe{listener: l}
// go l.listen()
return l, nil return l, nil
} }
// listener implements net.Listener for piped network // listener implements net.Listener for piped network
type listener struct { type listener struct {
network *Network // XXX needed ? // network/port we are listening on
port int // port we are listening on XXX needed ? network *Network
port int
dialq chan chan net.Conn // Dial requests to our port go here dialq chan chan net.Conn // Dial requests to our port go here
// acceptq chan chan connected // Accept requests go here
down chan struct{} // Close -> down=ready down chan struct{} // Close -> down=ready
downOnce sync.Once // so Close several times is ok downOnce sync.Once // so Close several times is ok
} }
var _ net.Listener = (*listener)(nil) // Close closes the listener
// it interrupts all currently in-flight calls to Accept
func (l *listener) Addr() net.Addr {
return &Addr{network: l.network.netname(), addr: fmt.Sprintf("%d", l.port)} // NOTE no c/s XXX -> +l ?
}
func (l *listener) Close() error { func (l *listener) Close() error {
l.downOnce.Do(func() { l.downOnce.Do(func() {
close(l.down) close(l.down)
...@@ -153,50 +145,25 @@ func (l *listener) Close() error { ...@@ -153,50 +145,25 @@ func (l *listener) Close() error {
return nil return nil
} }
/* // Accept tries to connect to Dial called with addr corresponding to our listener
// listen implements listener service process - connecting Accept and Dial calls with each other
func (l *listener) listen() {
for {
select {
case <-l.down:
return
case resp := <-l.dialq:
// TODO
case resp := <-l.acceptq:
// TODO
}
}
}
// connected is response from listener to Dial and Accept
type connected struct {
conn net.Conn
err error
}
*/
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
// ch := make(chan connected)
select { select {
case <-l.down: case <-l.down:
return nil, &net.OpError{Op: "accept", Net: l.network.netname(), Addr: l.Addr(), Err: errNetClosed} return nil, &net.OpError{Op: "accept", Net: l.network.netname(), Addr: l.Addr(), Err: errNetClosed}
/*
case l.acceptq <- ch:
resp := <-ch
return resp.conn, resp.err
*/
case resp := <-l.dialq: case resp := <-l.dialq:
// someone dialed us - let's connect // someone dialed us - let's connect
pc, ps := net.Pipe() pc, ps := net.Pipe()
// XXX allocate port and register to l.network.pipev
resp <- pc resp <- pc
return ps, nil return ps, nil
} }
} }
// Dial tries to connect to Accept called on listener corresponding to addr
func (n *Network) Dial(addr string) (net.Conn, error) { func (n *Network) Dial(addr string) (net.Conn, error) {
derr := func(err error) error { derr := func(err error) error {
return &net.OpError{Op: "dial", Net: n.netname(), Addr: &Addr{n.netname(), addr}, Err: err} return &net.OpError{Op: "dial", Net: n.netname(), Addr: &Addr{n.netname(), addr}, Err: err}
...@@ -220,7 +187,8 @@ func (n *Network) Dial(addr string) (net.Conn, error) { ...@@ -220,7 +187,8 @@ func (n *Network) Dial(addr string) (net.Conn, error) {
} }
l := p.listener l := p.listener
// NOTE listener is not locking n.mu -> it is ok to send/receive under mu // NOTE listener is not locking n.mu -> it is ok to send/receive under mu - FIXME not correct
// FIXME -> Accept needs to register new connection under n.mu
resp := make(chan net.Conn) resp := make(chan net.Conn)
select { select {
case <-l.down: case <-l.down:
...@@ -231,19 +199,36 @@ func (n *Network) Dial(addr string) (net.Conn, error) { ...@@ -231,19 +199,36 @@ func (n *Network) Dial(addr string) (net.Conn, error) {
} }
} }
// Addr returns address where listener is accepting incoming connections
func (l *listener) Addr() net.Addr {
return &Addr{network: l.network.netname(), addr: fmt.Sprintf("%d", l.port)} // NOTE no c/s XXX -> +l ?
}
// ----------------------------------------
// conn represents one endpoint of connection created under Network
type conn struct {
network *Network
// XXX port + c/s ?
net.Conn
}
// XXX conn.Close - unregister from network.connv
// XXX conn.LocalAddr -> ...
// XXX conn.RemoteAddr -> ...
// ----------------------------------------
var ( var (
netMu sync.Mutex netMu sync.Mutex
networks = map[string]*Network{} // netsuffix -> Network networks = map[string]*Network{} // netSuffix -> Network
DefaultNet = New("") DefaultNet = New("")
) )
// New creates, initializes and returns new pipenet Network // New creates, initializes and returns new pipenet Network
// network name is name of this network under "pipe" namesapce, e.g. ""
// network name must be unique - if not New will panic // network name must be unique - if not New will panic
func New(name string) *Network { func New(name string) *Network {
netMu.Lock() netMu.Lock()
...@@ -251,7 +236,7 @@ func New(name string) *Network { ...@@ -251,7 +236,7 @@ func New(name string) *Network {
_, already := networks[name] _, already := networks[name]
if already { if already {
panic(fmt.Errorf("pipenet %v already registered", name)) panic(fmt.Errorf("pipenet %q already registered", name))
} }
n := &Network{Name: name} n := &Network{Name: name}
...@@ -260,6 +245,7 @@ func New(name string) *Network { ...@@ -260,6 +245,7 @@ func New(name string) *Network {
} }
// lookupNet lookups Network by name // lookupNet lookups Network by name
// name is full network name, e.g. "pipe"
func lookupNet(name string) (*Network, error) { func lookupNet(name string) (*Network, error) {
if !strings.HasPrefix(NetPrefix, name) { if !strings.HasPrefix(NetPrefix, name) {
return nil, errBadNetwork return nil, errBadNetwork
...@@ -276,7 +262,7 @@ func lookupNet(name string) (*Network, error) { ...@@ -276,7 +262,7 @@ func lookupNet(name string) (*Network, error) {
} }
// Dial dials addr on a pipenet // Dial dials addr on a pipenet
// network should be full valid registered pipe network name, e.g. "pipe" // network should be full network name, e.g. "pipe"
func Dial(network, addr string) (net.Conn, error) { func Dial(network, addr string) (net.Conn, error) {
n, err := lookupNet(network) n, err := lookupNet(network)
if err != nil { if err != nil {
...@@ -287,7 +273,7 @@ func Dial(network, addr string) (net.Conn, error) { ...@@ -287,7 +273,7 @@ func Dial(network, addr string) (net.Conn, error) {
} }
// Listen starts listening on a pipenet address // Listen starts listening on a pipenet address
// network should be full valid registered pipe network name, e.g. "pipe" // network should be full network name, e.g. "pipe"
func Listen(network, laddr string) (net.Listener, error) { func Listen(network, laddr string) (net.Listener, error) {
n, err := lookupNet(network) n, err := lookupNet(network)
if err != nil { if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment