Commit 151d8b79 by Kirill Smelkov

go/zodb: LastTid -> Sync + Head

Today LastTid is ambiguous: does it return locally cached last transaction ID,
or it performs round-trip to server to get more uptodate view of what has been
last committed? As the result of this ambiguity some drivers (e.g. FileStorage,
ZEO) return cached view, while other drivers (upcoming NEO, living on t branch)
was doing the full round-trip.

There are also situations where whether or not LastTid performs syncing to
server affects semantics: for example if there are two ERP5 nodes and one
node commits something into ZODB and conveys the fact that something has been
committed via another channel, the second ERP5 node should have a reliable way
to observe committed data. But currently there is no such way: on next DB.Open,
even with at=0 meaning "use latest transaction" and with opt.NoSync=false,
DB.Open uses storage.LastTid() as a way to sync and oops - for some drivers it
won't result in real synchronization and thus opened connection could
potentially have older view with not yet latest data.

To fix this ambiguity require drivers to provide Sync functionality only. That
should return ID of last transaction committed to storage data as observed from
some time _afterwards_ Sync call was made. In other words for particular
client-server case, Sync cannot return cached view of storage and has to
perform round-trip to the server.

At IStorage level we provide two entry points: Sync, to perform syncing, and
Head that returns database head (last transaction ID) as viewed by local cache
without any synchronization.

Please see changes to IStorageDriver and IStorage interfaces for details.

The rest of the changes are:

- teach zodb.storage (the IStorage implementer) to actually implement Sync/Head.
- adapt zodb.DB to perform Sync for !opt.NoSync case.
- teach FileStorage to do proper syncing instead of returning local cache.
- teach ZEO to perform "lastTransaction" RPC call for syncing instead of
  returning local cache.
- adapt the rest of the tree to interfaces change.
1 parent 4224b580
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -180,10 +180,7 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c
lastTid, err := stor.LastTid(ctx)
if err != nil {
return err
}
at := stor.Head()
tstart := time.Now()
......@@ -191,7 +188,7 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c
nread := 0
loop:
for {
xid := zodb.Xid{Oid: oid, At: lastTid}
xid := zodb.Xid{Oid: oid, At: at}
if xid.Oid % nprefetch == 0 {
prefetchBlk(ctx, xid)
}
......@@ -404,9 +401,9 @@ func zwrkPreconnect(ctx context.Context, url string, at zodb.Tid, nwrk int) (_ [
storv[i] = stor
// storage to warm-up the connection
// ( in case of NEO LastTid connects to master and Load
// ( in case of NEO Sync connects to master and Load
// - to a storage )
_, err = stor.LastTid(ctx)
err = stor.Sync(ctx)
if err != nil {
return err
}
......@@ -455,15 +452,12 @@ func zwrkPrepare(ctx context.Context, url string, h hasher, check string) (at zo
err = xerr.First(err, err2)
}()
lastTid, err := stor.LastTid(ctx)
if err != nil {
return 0, nil, err
}
at = stor.Head()
oid := zodb.Oid(0)
loop:
for {
xid := zodb.Xid{Oid: oid, At: lastTid}
xid := zodb.Xid{Oid: oid, At: at}
buf, _, err := stor.Load(ctx, xid)
if err != nil {
switch errors.Cause(err).(type) {
......@@ -493,7 +487,7 @@ loop:
}
}
return lastTid, objcheckv, nil
return at, objcheckv, nil
}
// ----------------------------------------
......
......@@ -377,18 +377,14 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// find out db state we should open at
at := opt.At
if at == 0 {
if opt.NoSync {
db.mu.Lock()
at = db.δtail.Head()
db.mu.Unlock()
} else {
// sync storage for lastTid
var err error
at, err = db.stor.LastTid(ctx)
if !opt.NoSync {
// sync storage for head
err = db.stor.Sync(ctx)
if err != nil {
return nil, err
}
}
at = db.stor.Head()
}
// wait for db.Head ≥ at
......
......@@ -128,8 +128,8 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
l1cache: cache,
down: make(chan struct{}),
head: at0,
drvWatchq: drvWatchq,
drvHead: at0,
watchReq: make(chan watchRequest),
watchTab: make(map[chan<- Event]struct{}),
watchCancel: make(map[chan<- Event]chan struct{}),
......@@ -151,12 +151,15 @@ type storage struct {
l1cache *Cache // can be =nil, if opened with NoCache
down chan struct{} // ready when no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher|Sync
downErr error // reason for shutdown
// watcher
headMu sync.Mutex
head Tid // local view of storage head; mutated by watcher only
drvWatchq chan Event // watchq passed to driver
drvHead Tid // last tid received from drvWatchq
watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan<- Event]struct{} // registered watchers
......@@ -190,15 +193,6 @@ func (s *storage) Close() error {
// loading goes through cache - this way prefetching can work
func (s *storage) LastTid(ctx context.Context) (Tid, error) {
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) {
return InvalidTid, s.zerr("last_tid", nil, s.downErr)
}
return s.driver.LastTid(ctx)
}
// Load implements Loader.
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
......@@ -284,7 +278,7 @@ func (s *storage) _watcher() error {
panic("bad watch request op")
}
req.ack <- s.drvHead
req.ack <- s.head
}
// close all subscribers's watchq on watcher shutdow
......@@ -324,13 +318,15 @@ func (s *storage) _watcher() error {
case *EventCommit:
// verify event.Tid ↑ (else e.g. δtail.Append will panic)
// if !↑ - stop the storage with error.
if !(e.Tid > s.drvHead) {
if !(e.Tid > s.head) {
errDown = fmt.Errorf(
"%s: storage error: notified with δ.tid not ↑ (%s -> %s)",
s.URL(), s.drvHead, e.Tid)
s.URL(), s.head, e.Tid)
event = &EventError{errDown}
} else {
s.drvHead = e.Tid
s.headMu.Lock()
s.head = e.Tid
s.headMu.Unlock()
}
}
......@@ -366,7 +362,9 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
// no longer operational: behave if watchq was registered before that
// and then seen down/close events. Interact with DelWatch directly.
case <-s.down:
at0 = s.drvHead
s.headMu.Lock() // shutdown may be due to Close call and watcher might be
at0 = s.head // still running - we cannot skip locking.
s.headMu.Unlock()
s.watchMu.Lock()
_, already := s.watchTab[watchq]
......@@ -425,6 +423,79 @@ func (s *storage) DelWatch(watchq chan<- Event) {
}
}
// Head implements IStorage.
func (s *storage) Head() Tid {
s.headMu.Lock()
head := s.head
s.headMu.Unlock()
return head
}
// Sync implements IStorage.
func (s *storage) Sync(ctx context.Context) (err error) {
defer func() {
if err != nil {
err = s.zerr("sync", nil, err)
}
}()
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) {
return s.downErr
}
s.headMu.Lock()
at := s.head
s.headMu.Unlock()
head, err := s.driver.Sync(ctx)
if err != nil {
return err
}
// check that driver returns head↑
if !(head >= at) {
err = fmt.Errorf("%s: storage error: sync not ↑= (%s -> %s)", s.URL(), at, head)
s.shutdown(err)
return err
}
// wait till .head >= head
watchq := make(chan Event)
at = s.AddWatch(watchq)
defer s.DelWatch(watchq)
for at < head {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.down:
return s.downErr
case event, ok := <-watchq:
if !ok {
// closed
<-s.down
return s.downErr
}
switch e := event.(type) {
default:
panic(fmt.Sprintf("unexpected event %T", e))
case *EventError:
return e.Err
case *EventCommit:
at = e.Tid
}
}
}
return nil
}
// ---- misc ----
......
......@@ -101,6 +101,10 @@ type FileStorage struct {
// driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event
// sync(s) waiting for feedback from watcher
syncMu sync.Mutex
syncv []chan zodb.Tid
down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
errClose error // error from .file.Close()
......@@ -115,17 +119,6 @@ func (fs *FileStorage) zerr(op string, args interface{}, err error) *zodb.OpErro
return &zodb.OpError{URL: fs.URL(), Op: op, Args: args, Err: err}
}
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.downErr != nil {
return zodb.InvalidTid, fs.zerr("last_tid", nil, fs.downErr)
}
return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty
}
func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
......@@ -478,7 +471,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
}
// if watcher failed with e.g. IO error, we no longer know what is real
// last_tid and which objects were modified after it.
// head and which objects were modified after it.
// -> storage operations have to fail from now on.
fs.shutdown(err)
......@@ -522,8 +515,15 @@ func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error)
defer tick.Stop()
var t0partial time.Time
first := true
var syncv []chan zodb.Tid
mainloop:
for {
// notify Sync(s) that queued before previous stat + advance
for _, sync := range syncv {
sync <- fs.txnhMax.Tid // TODO +lock after commit is implemented
}
syncv = nil // just in case
if !first {
traceWatch("select ...")
select {
......@@ -571,6 +571,12 @@ mainloop:
}
first = false
// remember queued Sync(s) that we should notify after stat + advance
fs.syncMu.Lock()
syncv = fs.syncv
fs.syncv = nil
fs.syncMu.Unlock()
// check f size, to see whether there could be any updates.
fi, err := f.Stat()
if err != nil {
......@@ -690,6 +696,56 @@ mainloop:
}
}
// Sync implements zodb.IStorageDriver.
func (fs *FileStorage) Sync(ctx context.Context) (head zodb.Tid, err error) {
defer func() {
if err != nil {
err = fs.zerr("sync", nil, err)
}
}()
// check file size; if it is the same there was no new commits.
fs.mu.RLock()
topPos := fs.index.TopPos
head = fs.txnhMax.Tid
fs.mu.RUnlock()
fi, err := fs.file.Stat()
if err != nil {
return zodb.InvalidTid, err
}
fsize := fi.Size()
switch {
case fsize == topPos:
return head, nil // same as before
case fsize < topPos:
// XXX add pack support?
return zodb.InvalidTid, fmt.Errorf("file truncated (%d -> %d)", topPos, fsize)
}
// the file has more data than covered by current topPos. However that
// might be in-progress transaction that will be aborted. Ask watcher
// to give us feedback after it goes through one iteration to:
// - stat the file once again, and
// - advance as much as it can.
syncq := make(chan zodb.Tid, 1)
fs.syncMu.Lock()
fs.syncv = append(fs.syncv, syncq)
fs.syncMu.Unlock()
select {
case <-fs.down:
return zodb.InvalidTid, fs.downErr
case <-ctx.Done():
return zodb.InvalidTid, ctx.Err()
case head := <-syncq:
return head, nil
}
}
// --- open + rebuild index ---
// shutdown marks storage as no longer operational with specified reason.
......
......@@ -384,15 +384,15 @@ func TestWatch(t *testing.T) {
}
ctx := context.Background()
checkLastTid := func(lastOk zodb.Tid) {
checkHead := func(headOk zodb.Tid) {
t.Helper()
head, err := fs.LastTid(ctx); X(err)
if head != lastOk {
t.Fatalf("check last_tid: got %s; want %s", head, lastOk)
head, err := fs.Sync(ctx); X(err)
if head != headOk {
t.Fatalf("check head: got %s; want %s", head, headOk)
}
}
checkLastTid(at)
checkHead(at)
checkLoad := func(at zodb.Tid, oid zodb.Oid, dataOk string, serialOk zodb.Tid) {
t.Helper()
......@@ -441,7 +441,7 @@ func TestWatch(t *testing.T) {
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", δ.Tid, δ.Changev, at, objvWant)
}
checkLastTid(at)
checkHead(at)
// make sure we can load what was committed.
checkLoad(at, 0, data0, at)
......@@ -462,7 +462,7 @@ func TestOpenRecovery(t *testing.T) {
X := exc.Raiseif
main, err := ioutil.ReadFile("testdata/1.fs"); X(err)
index, err := ioutil.ReadFile("testdata/1.fs.index"); X(err)
lastTidOk := _1fs_dbEntryv[len(_1fs_dbEntryv)-1].Header.Tid
headOk := _1fs_dbEntryv[len(_1fs_dbEntryv)-1].Header.Tid
topPos := int64(_1fs_indexTopPos)
voteTail, err := ioutil.ReadFile("testdata/1voted.tail"); X(err)
......@@ -495,12 +495,12 @@ func TestOpenRecovery(t *testing.T) {
defer func() {
err = fs.Close(); X(err)
}()
if at0 != lastTidOk {
t.Fatalf("at0: %s ; expected: %s", at0, lastTidOk)
if at0 != headOk {
t.Fatalf("at0: %s ; expected: %s", at0, headOk)
}
head, err := fs.LastTid(ctx); X(err)
if head != lastTidOk {
t.Fatalf("last_tid: %s ; expected: %s", head, lastTidOk)
head, err := fs.Sync(ctx); X(err)
if head != headOk {
t.Fatalf("head: %s ; expected: %s", head, headOk)
}
})
}
......
......@@ -52,10 +52,25 @@ type zeo struct {
}
func (z *zeo) LastTid(ctx context.Context) (zodb.Tid, error) {
z.mu.Lock()
defer z.mu.Unlock()
return z.lastTid, nil
func (z *zeo) Sync(ctx context.Context) (head zodb.Tid, err error) {
defer func() {
if err != nil {
err = &zodb.OpError{URL: z.URL(), Op: "sync", Args: nil, Err: err}
}
}()
rpc := z.rpc("lastTransaction")
xhead, err := rpc.call(ctx)
if err != nil {
return zodb.InvalidTid, err
}
head, ok := tidUnpack(xhead)
if !ok {
return zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xhead)
}
return head, nil
}
func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
......
......@@ -325,10 +325,34 @@ type IStorage interface {
// same as in IStorageDriver
URL() string
Close() error
LastTid(context.Context) (Tid, error)
Loader
Iterator
// similar to IStorage
// Sync syncs to storage and updates current view of it.
//
// After Sync, Head is guaranteed to give ID of last transaction
// committed to storage data as observed from some time _afterwards_
// Sync call was made. In particular for client-server case, Sync
// cannot retain cached view of storage and has to perform round-trip
// to the server.
Sync(context.Context) error
// Head returns ID of last committed transaction.
//
// Returned head is ID of last committed transaction as observed from
// some time _before_ Head call was made. In particular for
// client-sever case, Head can return cached view of storage that was
// learned some time ago.
//
// Head is ↑=.
//
// Head is 0 if no transactions have been committed yet.
//
// Use Sync to synchronize with the storage.
Head() Tid
// additional to IStorageDriver
Prefetcher
Watcher
......@@ -355,10 +379,17 @@ type IStorageDriver interface {
// Close closes storage
Close() error
// LastTid returns the id of the last committed transaction.
// Sync syncs to storage and returns ID of last committed transaction.
//
// Returned head is ID of last transaction committed to storage data as
// observed from some time _afterwards_ Sync call was made. In particular
// for client-server case, Sync cannot return cached view of storage
// and has to perform round-trip to the server.
//
// Head is ↑=.
//
// If no transactions have been committed yet, LastTid returns 0.
LastTid(ctx context.Context) (Tid, error)
// Head is 0 if no transactions have been committed yet.
Sync(ctx context.Context) (head Tid, _ error)
Loader
Iterator
......
......@@ -41,12 +41,18 @@ var infov = []struct {name string; getParam paramFunc} {
}},
// TODO reenable size
// {"size", func(stor zodb.IStorage) (string, error) { return stor.StorageSize(), nil }},
{"head", zhead},
{"last_tid", func(ctx context.Context, stor zodb.IStorage) (string, error) {
tid, err := stor.LastTid(ctx)
return tid.String(), err
fmt.Fprintf(os.Stderr, "W: last_tid is deprecated alias for head\n")
return zhead(ctx, stor)
}},
}
func zhead(ctx context.Context, stor zodb.IStorage) (string, error) {
err := stor.Sync(ctx)
return stor.Head().String(), err
}
// {} parameter_name -> get_parameter(stor)
var infoDict = map[string]paramFunc{}
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!