Commit e4a617ec authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a094e447
......@@ -121,6 +121,11 @@ func TestMasterStorage(t *testing.T) {
//net := xnet.NetTrace(pipenet.New(""), tracer) // test network
net := pipenet.New("testnet") // test network
// syntatic shortcut for net tx events
nettx := func(src, dst, pkt string) *xnet.TraceTx {
return &xnet.TraceTx{Src: net.Addr(src), Dst: net.Addr(dst), Pkt: []byte(pkt)}
}
Mhost := xnet.NetTrace(net.Host("m"), tracer)
Shost := xnet.NetTrace(net.Host("s"), tracer)
......@@ -146,7 +151,7 @@ func TestMasterStorage(t *testing.T) {
// start storage
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", Maddr, Saddr, net, zstor)
S := NewStorage("abc1", Maddr, Saddr, Shost, zstor)
Sctx, Scancel := context.WithCancel(context.Background())
wg.Gox(func() {
err := S.Run(Sctx)
......@@ -159,21 +164,10 @@ func TestMasterStorage(t *testing.T) {
tc.Expect(&xnet.TraceDial{Dst: "0"})
//tc.ExpectNetDial("0")
//tc.Expect(xnet.NetTx{Src: "2c", Dst: "2s", Pkt: []byte("\x00\x00\x00\x01")})
//tc.Expect(nettx("2c", "2s", "\x00\x00\x00\x01"))
// handshake
tc.Expect(nettx("s:1", "m:1", "\x00\x00\x00\x01"))
tc.Expect(nettx("m:1", "s:1", "\x00\x00\x00\x01"))
//tc.ExpectNetTx("2c", "2s", "\x00\x00\x00\x01") // handshake
//tc.ExpectNetTx("2s", "2c", "\x00\x00\x00\x01")
tc.ExpectPar(
//&xnet.TraceTx{Src: "2c", Dst: "2s", Pkt: []byte("\x00\x00\x00\x01")},
//&xnet.TraceTx{Src: "2s", Dst: "2c", Pkt: []byte("\x00\x00\x00\x01")},)
&xnet.TraceTx{Src: &pipenet.Addr{Net: "pipe", Port: 2, Endpoint: 0}, Dst: &pipenet.Addr{Net: "pipe", Port: 2, Endpoint: 1}, Pkt: []byte("\x00\x00\x00\x01")},
nettx("s:1", "m:1", "\x00\x00\x00\x01"), // handshake
nettx("m:1", "s:1", "\x00\x00\x00\x01"),
)
//&xnet.TraceTx{Src: "2s", Dst: "2c", Pkt: []byte("\x00\x00\x00\x01")},)
// XXX temp
......
......@@ -151,21 +151,17 @@ func (n *Network) Host(name string) *Host {
// resolveAddr resolves addr on the network from the host point of view
// must be called with Network.mu held
func (h *Host) resolveAddr(addr string) (host *Host, port int, err error) {
hoststr, portstr, err := net.SplitHostPort(addr)
a, err := h.network.ParseAddr(addr)
if err != nil {
return nil, 0, err
}
port, err = strconv.Atoi(portstr)
if err != nil || port < 0 {
return nil, 0, &net.AddrError{Err: "invalid", Addr: addr}
}
// local host if host name omitted
if hoststr == "" {
hoststr = h.name
if a.Host == "" {
a.Host = h.name
}
host = h.network.hostMap[hoststr]
host = h.network.hostMap[a.Host]
if host == nil {
return nil, 0, &net.AddrError{Err: "no such host", Addr: addr}
}
......@@ -396,6 +392,19 @@ func (sk *socket) addr() *Addr {
func (a *Addr) Network() string { return a.Net }
func (a *Addr) String() string { return net.JoinHostPort(a.Host, strconv.Itoa(a.Port)) }
// ParseAddr parses addr into pipenet address
func (n *Network) ParseAddr(addr string) (*Addr, error) {
host, portstr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portstr)
if err != nil || port < 0 {
return nil, &net.AddrError{Err: "invalid port", Addr: addr}
}
return &Addr{Net: n.Network(), Host: host, Port: port}, nil
}
// Addr returns address where listener is accepting incoming connections
func (l *listener) Addr() net.Addr {
return l.socket.addr()
......
......@@ -22,7 +22,6 @@ import (
"fmt"
"io"
"net"
"strconv"
"reflect"
"testing"
......@@ -89,54 +88,48 @@ func assertEq(t *testing.T, a, b interface{}) {
func TestPipeNet(t *testing.T) {
pnet := New("t")
addr := func(hostport string) *Addr { // XXX -> Network.ParseAddr ?
host, portstr, err := net.SplitHostPort(hostport)
if err != nil {
t.Fatal(err)
}
port, err := strconv.Atoi(portstr)
if err != nil {
t.Fatal(err)
}
return &Addr{Net: pnet.Network(), Host: host, Port: port}
xaddr := func(addr string) *Addr {
a, err := pnet.ParseAddr(addr)
exc.Raiseif(err)
return a
}
:= pnet.Host("α")
:= pnet.Host("β")
_, err := .Dial(context.Background(), ":0")
assertEq(t, err, &net.OpError{Op: "dial", Net: "pipet", Addr: addr("α:0"), Err: errConnRefused})
assertEq(t, err, &net.OpError{Op: "dial", Net: "pipet", Addr: xaddr("α:0"), Err: errConnRefused})
l1 := xlisten(, "")
assertEq(t, l1.Addr(), addr("α:0"))
assertEq(t, l1.Addr(), xaddr("α:0"))
wg := &xsync.WorkGroup{}
wg.Gox(func() {
c1s := xaccept(l1)
assertEq(t, c1s.LocalAddr(), addr("α:1"))
assertEq(t, c1s.RemoteAddr(), addr("β:0"))
assertEq(t, c1s.LocalAddr(), xaddr("α:1"))
assertEq(t, c1s.RemoteAddr(), xaddr("β:0"))
assertEq(t, xread(c1s), "ping")
xwrite(c1s, "pong")
c2s := xaccept(l1)
assertEq(t, c2s.LocalAddr(), addr("α:2"))
assertEq(t, c2s.RemoteAddr(), addr("β:1"))
assertEq(t, c2s.LocalAddr(), xaddr("α:2"))
assertEq(t, c2s.RemoteAddr(), xaddr("β:1"))
assertEq(t, xread(c2s), "hello")
xwrite(c2s, "world")
})
c1c := xdial(, "α:0")
assertEq(t, c1c.LocalAddr(), addr("β:0"))
assertEq(t, c1c.RemoteAddr(), addr("α:1"))
assertEq(t, c1c.LocalAddr(), xaddr("β:0"))
assertEq(t, c1c.RemoteAddr(), xaddr("α:1"))
xwrite(c1c, "ping")
assertEq(t, xread(c1c), "pong")
c2c := xdial(, "α:0")
assertEq(t, c2c.LocalAddr(), addr("β:1"))
assertEq(t, c2c.RemoteAddr(), addr("α:2"))
assertEq(t, c2c.LocalAddr(), xaddr("β:1"))
assertEq(t, c2c.RemoteAddr(), xaddr("α:2"))
xwrite(c2c, "hello")
assertEq(t, xread(c2c), "world")
......@@ -144,5 +137,5 @@ func TestPipeNet(t *testing.T) {
xwait(wg)
l2 := xlisten(, "")
assertEq(t, l2.Addr(), addr("α:3"))
assertEq(t, l2.Addr(), xaddr("α:3"))
}
......@@ -139,7 +139,7 @@ loop:
// found matching event - good
eventExpectV = append(eventExpectV[:i], eventExpectV[i+1:]...)
close(msg.Ack)
close(msg.Ack) // XXX -> send ack for all only when all collected?
continue loop
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment