Commit 1df5acab authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/zeo: Fix initialization when server database is empty

In that case at0 was initialized as 0 and still considered uninitialized
by flushEventq0:

    (neo) (z-dev) (g.env) kirr@deco:~/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo$ go test -run Empty
    ------
    2020-10-12T07:39:25 INFO ZEO.runzeo (146240) opening storage '1' using FileStorage
    ------
    2020-10-12T07:39:25 INFO ZEO.StorageServer StorageServer created RW with storages: 1:RW:/tmp/zeo905263273/1.fs
    ------
    2020-10-12T07:39:25 INFO ZEO.asyncio.server listening on /tmp/zeo905263273/1.fs.zeosock
    ------
    2020-10-12T07:39:25 INFO ZEO.asyncio.base Connected server protocol
    ------
    2020-10-12T07:39:25 INFO ZEO.asyncio.server received handshake 'Z5'
    2020/10/12 07:39:25 /tmp/zeo905263273/1.fs.zeosock: EOF
    --- FAIL: TestEmptyDB (0.22s)
        --- FAIL: TestEmptyDB/py/msgpack=false (0.22s)
    panic: flush, but .at0 not yet initialized [recovered]
            panic: flush, but .at0 not yet initialized

    goroutine 7 [running]:
    testing.tRunner.func1.1(0x644a60, 0x6e1a50)
            /home/kirr/src/tools/go/go/src/testing/testing.go:1072 +0x30d
    testing.tRunner.func1(0xc000001e00)
            /home/kirr/src/tools/go/go/src/testing/testing.go:1075 +0x41a
    panic(0x644a60, 0x6e1a50)
            /home/kirr/src/tools/go/go/src/runtime/panic.go:969 +0x175
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.(*zeo).flushEventq0(0xc00018a000)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo.go:180 +0xf3
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.openByURL(0x6e9ca0, 0xc000016108, 0xc000138120, 0xc000153d98, 0x0, 0x0, 0x0, 0x0, 0x0)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo.go:488 +0x5ba
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.zeoOpen(0xc000018740, 0x1e, 0xc000049d98, 0x0, 0x0, 0x0, 0x0)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:285 +0x17b
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.withZEO.func1(0xc000001e00, 0x6e9ea0, 0xc00005e6c0)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:219 +0xd0
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.withZEOSrv.func2.1(0xc0000185c0, 0x16)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:205 +0xfb
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.withZEOSrv.func1(0xc000001e00, 0xc00000e5a0)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:185 +0x129
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.withZEOSrv.func2(0xc000001e00)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:197 +0x105
    testing.tRunner(0xc000001e00, 0xc00000e440)
            /home/kirr/src/tools/go/go/src/testing/testing.go:1123 +0xef
    created by testing.(*T).Run
            /home/kirr/src/tools/go/go/src/testing/testing.go:1168 +0x2b3
    exit status 2
    FAIL    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo     0.227s

-> Fix it by using dedicated field marking whether .at0 was initialized or not yet.
parent fe6e6107
...@@ -42,9 +42,11 @@ type zeo struct { ...@@ -42,9 +42,11 @@ type zeo struct {
// driver client <- watcher: database commits | errors. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event watchq chan<- zodb.Event
head zodb.Tid // last invalidation received from server head zodb.Tid // last invalidation received from server
at0Mu sync.Mutex at0Mu sync.Mutex
at0 zodb.Tid // at0 obtained when initially connecting to server at0 zodb.Tid // at0 obtained when initially connecting to server
eventq0 []*zodb.EventCommit // buffer for initial messages, until .at0 is initialized eventq0 []*zodb.EventCommit // buffer for initial messages, until .at0 is initialized
at0Initialized bool // true after .at0 is initialized
// becomes ready when serve loop finishes // becomes ready when serve loop finishes
serveWG sync.WaitGroup serveWG sync.WaitGroup
...@@ -161,7 +163,7 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) { ...@@ -161,7 +163,7 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) {
// queue initial events until .at0 is initialized after register // queue initial events until .at0 is initialized after register
// queued events will be sent to watchq by zeo ctor after initializing .at0 // queued events will be sent to watchq by zeo ctor after initializing .at0
if z.at0 == 0 { if !z.at0Initialized {
z.eventq0 = append(z.eventq0, event) z.eventq0 = append(z.eventq0, event)
return nil return nil
} }
...@@ -176,7 +178,7 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) { ...@@ -176,7 +178,7 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) {
// flushEventq0 flushes events queued in z.eventq0. // flushEventq0 flushes events queued in z.eventq0.
// must be called under .at0Mu // must be called under .at0Mu
func (z *zeo) flushEventq0() { func (z *zeo) flushEventq0() {
if z.at0 == 0 { if !z.at0Initialized {
panic("flush, but .at0 not yet initialized") panic("flush, but .at0 not yet initialized")
} }
if z.watchq != nil { if z.watchq != nil {
...@@ -443,7 +445,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -443,7 +445,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// close .watchq after serve is over // close .watchq after serve is over
z.at0Mu.Lock() z.at0Mu.Lock()
defer z.at0Mu.Unlock() defer z.at0Mu.Unlock()
if z.at0 != 0 { if z.at0Initialized {
z.flushEventq0() z.flushEventq0()
} }
if z.watchq != nil { if z.watchq != nil {
...@@ -485,6 +487,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -485,6 +487,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// filter-out first < at0 messages for this reason. // filter-out first < at0 messages for this reason.
z.at0Mu.Lock() z.at0Mu.Lock()
z.at0 = lastTid z.at0 = lastTid
z.at0Initialized = true
z.flushEventq0() z.flushEventq0()
z.at0Mu.Unlock() z.at0Mu.Unlock()
......
...@@ -240,6 +240,19 @@ func TestHandshake(t *testing.T) { ...@@ -240,6 +240,19 @@ func TestHandshake(t *testing.T) {
}) })
} }
// verify that connection to ZEO server with empty DB works ok.
func TestEmptyDB(t *testing.T) {
withZEO(t, func(t *testing.T, zsrv ZEOSrv, z *zeo) {
X := xtesting.FatalIf(t)
ctx := context.Background()
head, err := z.Sync(ctx); X(err)
headOk := zodb.Tid(0)
if head != headOk {
t.Errorf("head=%s ; expected %s", head, headOk)
}
})
}
func TestLoad(t *testing.T) { func TestLoad(t *testing.T) {
X := exc.Raiseif X := exc.Raiseif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment