Commit 95e7e398 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3041fee6
...@@ -38,12 +38,21 @@ import ( ...@@ -38,12 +38,21 @@ import (
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
) )
// ZEOSrv represents running ZEO server.
type ZEOSrv interface {
Addr() string // unix-socket address of the server
Close() error
Encoding() byte // encoding used on the wire - 'M' or 'Z'
}
// ZEOPySrv represents running ZEO/py server. // ZEOPySrv represents running ZEO/py server.
// //
// Create it with StartZEOPySrv(fs1path). // Create it with StartZEOPySrv(fs1path).
type ZEOPySrv struct { type ZEOPySrv struct {
pysrv *exec.Cmd // spawned `runzeo -f fs1path` pysrv *exec.Cmd // spawned `runzeo -f fs1path`
fs1path string fs1path string // filestorage location
opt ZEOPyOptions // options for spawned server
cancel func() // to stop pysrv cancel func() // to stop pysrv
done chan struct{} // ready after Wait completes done chan struct{} // ready after Wait completes
errExit error // error from Wait errExit error // error from Wait
...@@ -60,7 +69,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) { ...@@ -60,7 +69,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
z := &ZEOPySrv{fs1path: fs1path, cancel: cancel, done: make(chan struct{})} z := &ZEOPySrv{fs1path: fs1path, cancel: cancel, done: make(chan struct{})}
z.pysrv = exec.CommandContext(ctx, "python", "-m", "ZEO.runzeo", "-f", fs1path, "-a", z.zaddr()) z.pysrv = exec.CommandContext(ctx, "python", "-m", "ZEO.runzeo", "-f", fs1path, "-a", z.Addr())
msgpack := "" msgpack := ""
if opt.msgpack { if opt.msgpack {
msgpack = "y" msgpack = "y"
...@@ -92,7 +101,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) { ...@@ -92,7 +101,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) {
return nil, z.errExit return nil, z.errExit
} }
_, err := os.Stat(z.zaddr()) _, err := os.Stat(z.Addr())
if err == nil { if err == nil {
break // ZEO socket appeared break // ZEO socket appeared
} }
...@@ -109,8 +118,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) { ...@@ -109,8 +118,7 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) {
return z, nil return z, nil
} }
// zaddr returns address of unix socket to access spawned ZEO server. func (z *ZEOPySrv) Addr() string {
func (z *ZEOPySrv) zaddr() string {
return z.fs1path + ".zeosock" return z.fs1path + ".zeosock"
} }
...@@ -126,8 +134,14 @@ func (z *ZEOPySrv) Close() (err error) { ...@@ -126,8 +134,14 @@ func (z *ZEOPySrv) Close() (err error) {
return err return err
} }
func (z *ZEOPySrv) Encoding() byte {
encoding := byte('Z')
if z.opt.msgpack { encoding = byte('M') }
return encoding
}
// -------- // ----------------
// withZEOPySrv spawns new ZEO/py server and runs f in that environment. // withZEOPySrv spawns new ZEO/py server and runs f in that environment.
func withZEOPySrv(t *testing.T, opt ZEOPyOptions, f func(zpy *ZEOPySrv)) { func withZEOPySrv(t *testing.T, opt ZEOPyOptions, f func(zpy *ZEOPySrv)) {
...@@ -147,26 +161,35 @@ func withZEOPySrv(t *testing.T, opt ZEOPyOptions, f func(zpy *ZEOPySrv)) { ...@@ -147,26 +161,35 @@ func withZEOPySrv(t *testing.T, opt ZEOPyOptions, f func(zpy *ZEOPySrv)) {
f(zpy) f(zpy)
} }
func TestHandshake(t *testing.T) { // withZEOSrv runs f under all kind of ZEO servers.
X := mkFatalIf(t) func withZEOSrv(t *testing.T, f func(t *testing.T, zsrv ZEOSrv)) {
for _, msgpack := range []bool{false, true} { for _, msgpack := range []bool{false, true} {
t.Run(fmt.Sprintf("msgpack=%v", msgpack), func(t *testing.T) { // ZEO/py
t.Run(fmt.Sprintf("py/msgpack=%v", msgpack), func(t *testing.T) {
withZEOPySrv(t, ZEOPyOptions{msgpack: msgpack}, func(zpy *ZEOPySrv) { withZEOPySrv(t, ZEOPyOptions{msgpack: msgpack}, func(zpy *ZEOPySrv) {
f(t, zpy)
})
})
// TODO ZEO/go
}
}
func TestHandshake(t *testing.T) {
X := mkFatalIf(t)
withZEOSrv(t, func(t *testing.T, zsrv ZEOSrv) {
ctx := context.Background() ctx := context.Background()
net := xnet.NetPlain("unix") net := xnet.NetPlain("unix")
zlink, err := dialZLink(ctx, net, zpy.zaddr()); X(err) zlink, err := dialZLink(ctx, net, zsrv.Addr()); X(err)
defer func() { defer func() {
err := zlink.Close(); X(err) err := zlink.Close(); X(err)
}() }()
ewant := byte('Z') ewant := zsrv.Encoding()
if msgpack { ewant = byte('M') }
if zlink.encoding != ewant { if zlink.encoding != ewant {
t.Fatalf("handshake: encoding=%c ; want %c", zlink.encoding, ewant) t.Fatalf("handshake: encoding=%c ; want %c", zlink.encoding, ewant)
} }
}) })
})
}
} }
func TestLoad(t *testing.T) { func TestLoad(t *testing.T) {
...@@ -188,7 +211,7 @@ func TestLoad(t *testing.T) { ...@@ -188,7 +211,7 @@ func TestLoad(t *testing.T) {
err := zpy.Close(); X(err) err := zpy.Close(); X(err)
}() }()
z, _, err := zeoOpen(zpy.zaddr(), &zodb.DriverOptions{ReadOnly: true}); X(err) z, _, err := zeoOpen(zpy.Addr(), &zodb.DriverOptions{ReadOnly: true}); X(err)
defer func() { defer func() {
err := z.Close(); X(err) err := z.Close(); X(err)
}() }()
...@@ -209,7 +232,7 @@ func TestWatch(t *testing.T) { ...@@ -209,7 +232,7 @@ func TestWatch(t *testing.T) {
err := zpy.Close(); X(err) err := zpy.Close(); X(err)
}() }()
xtesting.DrvTestWatch(t, "zeo://" + zpy.zaddr(), openByURL) xtesting.DrvTestWatch(t, "zeo://" + zpy.Addr(), openByURL)
} }
......
...@@ -75,7 +75,6 @@ type zLink struct { ...@@ -75,7 +75,6 @@ type zLink struct {
ver string // protocol version in use (without "Z" or "M" prefix) ver string // protocol version in use (without "Z" or "M" prefix)
encoding byte // protocol encoding in use ('Z' or 'M') encoding byte // protocol encoding in use ('Z' or 'M')
// XXX ^^^ better -> codec inteface{ pktEncode, pktDecode } ?
} }
// (called after handshake) // (called after handshake)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment