Commit b1229367 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 583cfa66
...@@ -486,6 +486,10 @@ func (c *Client) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxn ...@@ -486,6 +486,10 @@ func (c *Client) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxn
panic("TODO") panic("TODO")
} }
func (c *Client) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
panic("TODO")
}
// ---- ZODB open/url support ---- // ---- ZODB open/url support ----
......
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -75,7 +75,7 @@ import ( ...@@ -75,7 +75,7 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr" // "lab.nexedi.com/kirr/go123/xerr"
) )
// FileStorage is a ZODB storage which stores data in simple append-only file // FileStorage is a ZODB storage which stores data in simple append-only file
...@@ -84,6 +84,11 @@ import ( ...@@ -84,6 +84,11 @@ import (
// It is on-disk compatible with FileStorage from ZODB/py. // It is on-disk compatible with FileStorage from ZODB/py.
type FileStorage struct { type FileStorage struct {
file *os.File file *os.File
// protects updates to index and to everything below - in other words
// to everything that depends on what current last transaction is.
mu sync.RWMutex
index *Index // oid -> data record position in transaction which last changed oid index *Index // oid -> data record position in transaction which last changed oid
// transaction headers for min/max transactions committed // transaction headers for min/max transactions committed
...@@ -96,12 +101,16 @@ type FileStorage struct { ...@@ -96,12 +101,16 @@ type FileStorage struct {
var _ zodb.IStorageDriver = (*FileStorage)(nil) var _ zodb.IStorageDriver = (*FileStorage)(nil)
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) { func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
// XXX must be under lock fs.mu.RLock()
defer fs.mu.RUnlock()
return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty
} }
func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) { func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) {
// XXX must be under lock fs.mu.RLock()
defer fs.mu.RUnlock()
lastOid, _ := fs.index.Last() // returns zero-value, if empty lastOid, _ := fs.index.Last() // returns zero-value, if empty
return lastOid, nil return lastOid, nil
} }
...@@ -150,7 +159,9 @@ func (fs *FileStorage) Load_XXXWithNextSerialXXX(_ context.Context, xid zodb.Xid ...@@ -150,7 +159,9 @@ func (fs *FileStorage) Load_XXXWithNextSerialXXX(_ context.Context, xid zodb.Xid
// FIXME kill nextSerial support after neo/py cache does not depend on next_serial // FIXME kill nextSerial support after neo/py cache does not depend on next_serial
func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial, nextSerial zodb.Tid, err error) { func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial, nextSerial zodb.Tid, err error) {
// lookup in index position of oid data record within latest transaction which changed this oid // lookup in index position of oid data record within latest transaction which changed this oid
fs.mu.RLock()
dataPos, ok := fs.index.Get(xid.Oid) dataPos, ok := fs.index.Get(xid.Oid)
fs.mu.RUnlock()
if !ok { if !ok {
return nil, 0, 0, &zodb.NoObjectError{Oid: xid.Oid} return nil, 0, 0, &zodb.NoObjectError{Oid: xid.Oid}
} }
...@@ -231,7 +242,7 @@ const ( ...@@ -231,7 +242,7 @@ const (
zIterPreloaded // data for this iteration was already preloaded zIterPreloaded // data for this iteration was already preloaded
) )
// NextTxn iterates to next/previous transaction record according to iteration direction // NextTxn iterates to next/previous transaction record according to iteration direction.
func (zi *zIter) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIterator, error) { func (zi *zIter) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIterator, error) {
// TODO err -> OpError("iter", tidmin..tidmax) // TODO err -> OpError("iter", tidmin..tidmax)
switch { switch {
...@@ -259,7 +270,7 @@ func (zi *zIter) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIterator, ...@@ -259,7 +270,7 @@ func (zi *zIter) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIterator,
return &zi.iter.Txnh.TxnInfo, zi, nil return &zi.iter.Txnh.TxnInfo, zi, nil
} }
// NextData iterates to next data record and loads data content // NextData iterates to next data record and loads data content.
func (zi *zIter) NextData(_ context.Context) (*zodb.DataInfo, error) { func (zi *zIter) NextData(_ context.Context) (*zodb.DataInfo, error) {
// TODO err -> OpError("iter", tidmin..tidmax) // TODO err -> OpError("iter", tidmin..tidmax)
err := zi.iter.NextData() err := zi.iter.NextData()
...@@ -313,11 +324,12 @@ func (e *iterStartError) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIt ...@@ -313,11 +324,12 @@ func (e *iterStartError) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIt
// //
// if there is no such transaction returned error will be EOF. // if there is no such transaction returned error will be EOF.
func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, error) { func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, error) {
// XXX read snapshot under lock fs.mu.RLock()
// check for empty database // check for empty database
if fs.txnhMin.Len == 0 { if fs.txnhMin.Len == 0 {
// empty database - no such record // empty database - no such record
fs.mu.RUnlock()
return TxnHeader{}, io.EOF return TxnHeader{}, io.EOF
} }
...@@ -327,6 +339,8 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er ...@@ -327,6 +339,8 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er
tmin.CloneFrom(&fs.txnhMin) tmin.CloneFrom(&fs.txnhMin)
tmax.CloneFrom(&fs.txnhMax) tmax.CloneFrom(&fs.txnhMax)
fs.mu.RUnlock()
if tmax.Tid < tid { if tmax.Tid < tid {
return TxnHeader{}, io.EOF // no such record return TxnHeader{}, io.EOF // no such record
} }
...@@ -383,7 +397,7 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er ...@@ -383,7 +397,7 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er
return txnhFound, nil return txnhFound, nil
} }
// Iterate creates zodb-level iterator for tidMin..tidMax range // Iterate creates zodb-level iterator for tidMin..tidMax range.
func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIterator { func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
// when iterating use IO optimized for sequential access // when iterating use IO optimized for sequential access
fsSeq := seqReadAt(fs.file) fsSeq := seqReadAt(fs.file)
...@@ -421,10 +435,35 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb. ...@@ -421,10 +435,35 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
return ziter return ziter
} }
// --- watcher ---
func (fs *FileStorage) watch() {
// XXX
}
func (fs *FileStorage) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
panic("TODO")
}
// --- open + rebuild index --- // --- open + rebuild index ---
// open opens FileStorage without loading index func (fs *FileStorage) Close() error {
func open(path string) (_ *FileStorage, err error) { // XXX stop watcher
err := fs.file.Close()
if err != nil {
return &zodb.OpError{URL: fs.URL(), Op: "close", Args: nil, Err: err}
}
// TODO if opened !ro -> .saveIndex()
return nil
}
// Open opens FileStorage @path.
//
// TODO read-write support
func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
fs := &FileStorage{} fs := &FileStorage{}
f, err := os.Open(path) f, err := os.Open(path)
...@@ -438,6 +477,8 @@ func open(path string) (_ *FileStorage, err error) { ...@@ -438,6 +477,8 @@ func open(path string) (_ *FileStorage, err error) {
} }
}() }()
// XXX wrap err with "open <path>" ?
// check file magic // check file magic
fh := FileHeader{} fh := FileHeader{}
err = fh.Load(f) err = fh.Load(f)
...@@ -445,131 +486,67 @@ func open(path string) (_ *FileStorage, err error) { ...@@ -445,131 +486,67 @@ func open(path string) (_ *FileStorage, err error) {
return nil, err return nil, err
} }
// FIXME rework opening logic to support case when last txn was committed only partially // load index
fseq := seqReadAt(f)
// determine topPos from file size index, err := LoadIndexFile(f.Name() + ".index")
fi, err := f.Stat() if err == nil {
if err != nil { // index exists & loaded ok - quickly verify its sanity for last 100 transactions
return nil, err _, err = index.Verify(ctx, fseq, 100, nil/*no progress*/)
}
topPos := fi.Size()
// read tidMin/tidMax
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll)
err = okEOF(err) // e.g. it is EOF when file is empty
if err != nil {
return nil, err
}
err = fs.txnhMax.Load(f, topPos, LoadAll)
// expect EOF forward
// FIXME ^^^ it will be no EOF if a txn was committed only partially
if err != io.EOF {
if err == nil {
err = fmt.Errorf("%s: no EOF after topPos", f.Name())
}
return nil, fmt.Errorf("%s: %s", f.Name(), err)
}
// .LenPrev must be good or EOF backward
switch fs.txnhMax.LenPrev {
case -1:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
case 0:
// ok - EOF backward
default:
// .LenPrev is ok - read last previous record
err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil { if err != nil {
return nil, err index = nil // not sane - we will rebuild
} }
} }
return fs, nil
}
// Open opens FileStorage @path.
//
// TODO read-write support
func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
// open data file
fs, err := open(path)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
fs.file.Close() // XXX lclose
}
}()
// load-verify / rebuild index
err = fs.loadIndex(ctx)
if err != nil { if err != nil {
// index either did not exist, or corrupt or IO error - rebuild it from scratch
log.Print(err) log.Print(err)
log.Printf("%s: index recompute...", path) log.Printf("%s: index rebuild...", path)
fs.index, err = fs.computeIndex(ctx) index, err = BuildIndex(ctx, fseq, nil/*no progress; XXX log it? */)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO if opened !ro -> .saveIndex()
}
return fs, nil
}
func (fs *FileStorage) Close() error {
err := fs.file.Close()
if err != nil {
return &zodb.OpError{URL: fs.URL(), Op: "close", Args: nil, Err: err}
}
// TODO if opened !ro -> .saveIndex()
return nil
}
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) {
// XXX lock?
return BuildIndex(ctx, seqReadAt(fs.file), nil/*no progress; XXX somehow log it? */)
}
// loadIndex loads on-disk index to RAM and verifies it against data lightly
func (fs *FileStorage) loadIndex(ctx context.Context) (err error) {
// XXX lock?
defer xerr.Contextf(&err, "%s", fs.file.Name())
index, err := LoadIndexFile(fs.file.Name() + ".index")
if err != nil {
return err
} }
topPos := fs.txnhMax.Pos + fs.txnhMax.Len // index loaded. In particular this gives us index.TopPos that is, possibly
if index.TopPos != topPos { // outdated, valid position for start of a transaction in the data file.
return fmt.Errorf("inconsistent index topPos: data=%d index=%d", topPos, index.TopPos) // Update the index starting from that till latest transaction.
} err = index.Update(ctx, fseq, -1, nil/*no progress; XXX log it? */)
// quickly verify index sanity for last 100 transactions
_, err = index.Verify(ctx, seqReadAt(fs.file), 100, nil/*no progress*/)
if err != nil { if err != nil {
return err return nil, err
} }
fs.index = index fs.index = index
return nil
}
// saveIndex flushes in-RAM index to disk // now we have the index covering till last transaction in data file.
func (fs *FileStorage) saveIndex() (err error) { // fill-in min/max txnh
// XXX lock? if index.TopPos > txnValidFrom {
defer xerr.Contextf(&err, "%s", fs.file.Name()) err = fs.txnhMin.Load(f, txnValidFrom, LoadAll)
err = noEOF(err)
if err != nil {
return nil, err
}
err = fs.index.SaveFile(fs.file.Name() + ".index") _ = fs.txnhMax.Load(f, index.TopPos, LoadAll)
if err != nil { // NOTE it will be EOF on stable storage, but it might be non-EOF
return err // if a txn-in-progress was committed only partially. We care only
// that we read .LenPrev ok.
switch fs.txnhMax.LenPrev {
case -1:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
case 0:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction): unexpected EOF backward", f.Name(), fs.txnhMax.Pos)
default:
// .LenPrev is ok - read last previous record
err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil {
return nil, err
}
}
} }
// XXX fsync here? // there might be simultaneous updates to the data file from outside.
return nil // launch the watcher who will observe them.
//go fs.watcher()
return fs, nil
} }
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -32,12 +32,12 @@ import ( ...@@ -32,12 +32,12 @@ import (
"lab.nexedi.com/kirr/go123/xbytes" "lab.nexedi.com/kirr/go123/xbytes"
) )
// FileHeader represents header of whole data file // FileHeader represents header of whole data file.
type FileHeader struct { type FileHeader struct {
Magic [4]byte Magic [4]byte
} }
// TxnHeader represents header of a transaction record // TxnHeader represents header of a transaction record.
type TxnHeader struct { type TxnHeader struct {
Pos int64 // position of transaction start Pos int64 // position of transaction start
LenPrev int64 // whole previous transaction record length (see EOF/error rules in Load) LenPrev int64 // whole previous transaction record length (see EOF/error rules in Load)
...@@ -52,7 +52,7 @@ type TxnHeader struct { ...@@ -52,7 +52,7 @@ type TxnHeader struct {
workMem []byte workMem []byte
} }
// DataHeader represents header of a data record // DataHeader represents header of a data record.
type DataHeader struct { type DataHeader struct {
Pos int64 // position of data record start Pos int64 // position of data record start
Oid zodb.Oid Oid zodb.Oid
...@@ -110,25 +110,26 @@ func (e *RecordError) Error() string { ...@@ -110,25 +110,26 @@ func (e *RecordError) Error() string {
} }
// err creates RecordError for transaction located at txnh.Pos in f // err creates RecordError for transaction located at txnh.Pos in f.
func (txnh *TxnHeader) err(f interface{}, subj string, err error) error { func (txnh *TxnHeader) err(f interface{}, subj string, err error) error {
return &RecordError{ioname(f), "transaction record", txnh.Pos, subj, err} return &RecordError{ioname(f), "transaction record", txnh.Pos, subj, err}
} }
// err creates RecordError for data record located at dh.Pos in f // err creates RecordError for data record located at dh.Pos in f.
func (dh *DataHeader) err(f interface{}, subj string, err error) error { func (dh *DataHeader) err(f interface{}, subj string, err error) error {
return &RecordError{ioname(f), "data record", dh.Pos, subj, err} return &RecordError{ioname(f), "data record", dh.Pos, subj, err}
} }
// ierr is an interface for something which can create errors. // ierr is an interface for something which can create errors.
//
// it is used by TxnHeader and DataHeader to create appropriate errors with their context. // it is used by TxnHeader and DataHeader to create appropriate errors with their context.
type ierr interface { type ierr interface {
err(f interface{}, subj string, err error) error err(f interface{}, subj string, err error) error
} }
// errf is syntactic shortcut for err and fmt.Errorf // errf is syntactic shortcut for err and fmt.Errorf.
func errf(f interface{}, e ierr, subj, format string, a ...interface{}) error { func errf(f interface{}, e ierr, subj, format string, a ...interface{}) error {
return e.err(f, subj, fmt.Errorf(format, a...)) return e.err(f, subj, fmt.Errorf(format, a...))
} }
...@@ -151,7 +152,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error { ...@@ -151,7 +152,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error {
_, err := r.ReadAt(fh.Magic[:], 0) _, err := r.ReadAt(fh.Magic[:], 0)
err = okEOF(err) err = okEOF(err)
if err != nil { if err != nil {
return err return fmt.Errorf("%sread magic: %s", ioprefix(r), err)
} }
if string(fh.Magic[:]) != Magic { if string(fh.Magic[:]) != Magic {
return fmt.Errorf("%sinvalid fs1 magic %q", ioprefix(r), fh.Magic) return fmt.Errorf("%sinvalid fs1 magic %q", ioprefix(r), fh.Magic)
...@@ -169,12 +170,12 @@ func (txnh *TxnHeader) HeaderLen() int64 { ...@@ -169,12 +170,12 @@ func (txnh *TxnHeader) HeaderLen() int64 {
return TxnHeaderFixSize + int64(len(txnh.workMem)) return TxnHeaderFixSize + int64(len(txnh.workMem))
} }
// DataPos returns start position of data inside transaction record // DataPos returns start position of data inside transaction record.
func (txnh *TxnHeader) DataPos() int64 { func (txnh *TxnHeader) DataPos() int64 {
return txnh.Pos + txnh.HeaderLen() return txnh.Pos + txnh.HeaderLen()
} }
// DataLen returns length of all data inside transaction record container // DataLen returns length of all data inside transaction record container.
func (txnh *TxnHeader) DataLen() int64 { func (txnh *TxnHeader) DataLen() int64 {
return txnh.Len - txnh.HeaderLen() - 8 /* trailer redundant length */ return txnh.Len - txnh.HeaderLen() - 8 /* trailer redundant length */
} }
...@@ -329,7 +330,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error ...@@ -329,7 +330,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
return err return err
} }
// loadStrings makes sure strings that are part of transaction header are loaded // loadStrings makes sure strings that are part of transaction header are loaded.
func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error { func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error {
// XXX make it no-op if strings are already loaded? // XXX make it no-op if strings are already loaded?
...@@ -659,7 +660,8 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error { ...@@ -659,7 +660,8 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
// LoadData loads data for the data record taking backpointers into account. // LoadData loads data for the data record taking backpointers into account.
// //
// NOTE on success dh state is changed to data header of original data transaction. // On success dh state is changed to data header of original data transaction.
//
// NOTE "deleted" records are indicated via returning buf with .Data=nil without error. // NOTE "deleted" records are indicated via returning buf with .Data=nil without error.
func (dh *DataHeader) LoadData(r io.ReaderAt) (*mem.Buf, error) { func (dh *DataHeader) LoadData(r io.ReaderAt) (*mem.Buf, error) {
// scan via backpointers // scan via backpointers
...@@ -686,7 +688,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*mem.Buf, error) { ...@@ -686,7 +688,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*mem.Buf, error) {
// --- raw iteration --- // --- raw iteration ---
// Iter is combined 2-level iterator over transaction and data records // Iter is combined 2-level iterator over transaction and data records.
type Iter struct { type Iter struct {
R io.ReaderAt R io.ReaderAt
Dir IterDir Dir IterDir
...@@ -728,13 +730,13 @@ func (it *Iter) NextTxn(flags TxnLoadFlags) error { ...@@ -728,13 +730,13 @@ func (it *Iter) NextTxn(flags TxnLoadFlags) error {
return err return err
} }
// NextData iterates to next data record header inside current transaction // NextData iterates to next data record header inside current transaction.
func (it *Iter) NextData() error { func (it *Iter) NextData() error {
return it.Datah.LoadNext(it.R, &it.Txnh) return it.Datah.LoadNext(it.R, &it.Txnh)
} }
// Iterate creates Iter to iterate over r starting from posStart in direction dir // Iterate creates Iter to iterate over r starting from posStart in direction dir.
func Iterate(r io.ReaderAt, posStart int64, dir IterDir) *Iter { func Iterate(r io.ReaderAt, posStart int64, dir IterDir) *Iter {
it := &Iter{R: r, Dir: dir, Txnh: TxnHeader{Pos: posStart}} it := &Iter{R: r, Dir: dir, Txnh: TxnHeader{Pos: posStart}}
switch dir { switch dir {
......
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -83,7 +83,7 @@ const ( ...@@ -83,7 +83,7 @@ const (
posValidMask uint64 = 1<<48 - 1 // 0x0000ffffffffffff posValidMask uint64 = 1<<48 - 1 // 0x0000ffffffffffff
) )
// IndexSaveError is the error type returned by index save routines // IndexSaveError is the error type returned by index save routines.
type IndexSaveError struct { type IndexSaveError struct {
Err error // error that occurred during the operation Err error // error that occurred during the operation
} }
...@@ -92,7 +92,7 @@ func (e *IndexSaveError) Error() string { ...@@ -92,7 +92,7 @@ func (e *IndexSaveError) Error() string {
return "index save: " + e.Err.Error() return "index save: " + e.Err.Error()
} }
// Save saves index to a writer // Save saves index to a writer.
func (fsi *Index) Save(w io.Writer) (err error) { func (fsi *Index) Save(w io.Writer) (err error) {
defer func() { defer func() {
if err == nil { if err == nil {
...@@ -201,7 +201,7 @@ func (fsi *Index) SaveFile(path string) error { ...@@ -201,7 +201,7 @@ func (fsi *Index) SaveFile(path string) error {
return nil return nil
} }
// IndexLoadError is the error type returned by index load routines // IndexLoadError is the error type returned by index load routines.
type IndexLoadError struct { type IndexLoadError struct {
Filename string // present if used IO object was with .Name() Filename string // present if used IO object was with .Name()
Pos int64 Pos int64
...@@ -220,7 +220,7 @@ func (e *IndexLoadError) Error() string { ...@@ -220,7 +220,7 @@ func (e *IndexLoadError) Error() string {
return s return s
} }
// LoadIndex loads index from a reader // LoadIndex loads index from a reader.
func LoadIndex(r io.Reader) (fsi *Index, err error) { func LoadIndex(r io.Reader) (fsi *Index, err error) {
var picklePos int64 var picklePos int64
defer func() { defer func() {
...@@ -388,7 +388,7 @@ func treeEqual(a, b *fsb.Tree) bool { ...@@ -388,7 +388,7 @@ func treeEqual(a, b *fsb.Tree) bool {
// --- build index from FileStorage data --- // --- build index from FileStorage data ---
// IndexUpdateProgress is data sent by Index.Update to notify about progress // IndexUpdateProgress is data sent by Index.Update to notify about progress.
type IndexUpdateProgress struct { type IndexUpdateProgress struct {
TopPos int64 // data range to update to; if = -1 -- till EOF TopPos int64 // data range to update to; if = -1 -- till EOF
TxnIndexed int // # transactions read/indexed so far TxnIndexed int // # transactions read/indexed so far
...@@ -410,8 +410,9 @@ type IndexUpdateProgress struct { ...@@ -410,8 +410,9 @@ type IndexUpdateProgress struct {
// which position in data the index could be updated. // which position in data the index could be updated.
// //
// On success returned error is nil and index.TopPos is set to either: // On success returned error is nil and index.TopPos is set to either:
// - topPos (if it is != -1), or //
// - r's position at which read got EOF (if topPos=-1). // - topPos (if it is != -1), or
// - r's position at which read got EOF (if topPos=-1).
func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, progress func(*IndexUpdateProgress)) (err error) { func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, progress func(*IndexUpdateProgress)) (err error) {
defer xerr.Contextf(&err, "%sreindex %v..%v", ioprefix(r), index.TopPos, topPos) defer xerr.Contextf(&err, "%sreindex %v..%v", ioprefix(r), index.TopPos, topPos)
...@@ -553,7 +554,7 @@ func indexCorrupt(f interface{}, format string, argv ...interface{}) *IndexCorru ...@@ -553,7 +554,7 @@ func indexCorrupt(f interface{}, format string, argv ...interface{}) *IndexCorru
return &IndexCorruptError{DataFileName: ioname(f), Detail: fmt.Sprintf(format, argv...)} return &IndexCorruptError{DataFileName: ioname(f), Detail: fmt.Sprintf(format, argv...)}
} }
// IndexVerifyProgress is data sent by Index.Verify to notify about progress // IndexVerifyProgress is data sent by Index.Verify to notify about progress.
type IndexVerifyProgress struct { type IndexVerifyProgress struct {
TxnTotal int // total # of transactions to verify; if = -1 -- whole data TxnTotal int // total # of transactions to verify; if = -1 -- whole data
TxnChecked int TxnChecked int
...@@ -575,8 +576,9 @@ type IndexVerifyProgress struct { ...@@ -575,8 +576,9 @@ type IndexVerifyProgress struct {
// exactly the same as if it was build anew for data in range ..index.TopPos . // exactly the same as if it was build anew for data in range ..index.TopPos .
// //
// Returned error is either: // Returned error is either:
// - of type *IndexCorruptError, when data in index was found not to match original data, or //
// - any other error type representing e.g. IO error when reading original data or something else. // - of type *IndexCorruptError, when data in index was found not to match original data, or
// - any other error type representing e.g. IO error when reading original data or something else.
func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progress func(*IndexVerifyProgress)) (oidChecked map[zodb.Oid]struct{}, err error) { func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progress func(*IndexVerifyProgress)) (oidChecked map[zodb.Oid]struct{}, err error) {
defer func() { defer func() {
if _, ok := err.(*IndexCorruptError); ok { if _, ok := err.(*IndexCorruptError); ok {
...@@ -672,7 +674,7 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres ...@@ -672,7 +674,7 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres
return oidChecked, nil return oidChecked, nil
} }
// VerifyForFile checks index correctness against FileStorage data in file @ path // VerifyForFile checks index correctness against FileStorage data in file @ path.
// //
// See Verify for semantic description. // See Verify for semantic description.
func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, progress func(*IndexVerifyProgress)) (oidChecked map[zodb.Oid]struct{}, err error) { func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, progress func(*IndexVerifyProgress)) (oidChecked map[zodb.Oid]struct{}, err error) {
...@@ -691,7 +693,7 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr ...@@ -691,7 +693,7 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr
return nil, err return nil, err
} }
topPos := fi.Size() // XXX there might be last TxnInprogress transaction topPos := fi.Size() // XXX there might be last TxnInprogress transaction XXX
if index.TopPos != topPos { if index.TopPos != topPos {
return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos) return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos)
} }
......
...@@ -91,6 +91,9 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte ...@@ -91,6 +91,9 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte
panic("TODO") panic("TODO")
} }
func (z *zeo) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
panic("TODO")
}
// errorUnexpectedReply is returned by zLink.Call callers when reply was // errorUnexpectedReply is returned by zLink.Call callers when reply was
// received successfully, but is not what the caller expected. // received successfully, but is not what the caller expected.
...@@ -307,7 +310,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (_ zodb.I ...@@ -307,7 +310,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (_ zodb.I
return nil, fmt.Errorf("TODO write mode not implemented") return nil, fmt.Errorf("TODO write mode not implemented")
} }
zl, err := dialZLink(ctx, net, addr) zl, err := dialZLink(ctx, net, addr) // XXX + methodTable
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -54,19 +54,21 @@ var protoVersions = []string{ ...@@ -54,19 +54,21 @@ var protoVersions = []string{
// zLink is ZEO connection between client (local end) and server (remote end). // zLink is ZEO connection between client (local end) and server (remote end).
// //
// zLink provides service to make RPC requests. // zLink provides service to make and receive RPC requests.
// XXX and receive notification from server (server sends invalidations)
// //
// create zLink via dialZLink or handshake. // create zLink via dialZLink or handshake.
type zLink struct { type zLink struct {
link net.Conn // underlying network link net.Conn // underlying network
rxbuf rbuf.RingBuf // buffer for reading from link rxbuf rbuf.RingBuf // buffer for reading from link
// calls in-flight // our in-flight calls
callMu sync.Mutex callMu sync.Mutex
callTab map[int64]chan msg // msgid -> rxc for that call; nil when closed callTab map[int64]chan msg // msgid -> rxc for that call; nil when closed
callID int64 // ID for next call; incremented at every call callID int64 // ID for next call; incremented at every call
// methods peer can invoke
methTab map[string]func(interface{})
serveWg sync.WaitGroup // for serveRecv serveWg sync.WaitGroup // for serveRecv
down1 sync.Once down1 sync.Once
errClose error // error got from .link.Close() errClose error // error got from .link.Close()
...@@ -98,6 +100,8 @@ func (zl *zLink) shutdown(err error) { ...@@ -98,6 +100,8 @@ func (zl *zLink) shutdown(err error) {
for _, rxc := range callTab { for _, rxc := range callTab {
rxc <- msg{arg: nil} // notify link was closed XXX ok? or err explicitly? rxc <- msg{arg: nil} // notify link was closed XXX ok? or err explicitly?
} }
// XXX close watcher
}) })
} }
...@@ -138,24 +142,33 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error { ...@@ -138,24 +142,33 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
return err return err
} }
if m.method != ".reply" { // "invalidateTransaction", tid, oidv
// TODO add notification channel (server calls client by itself")
return fmt.Errorf(".%d: method=%q; expected \".reply\"", m.msgid, m.method)
}
// lookup call by msgid and dispatch result to waiter if m.method == ".reply" {
zl.callMu.Lock() // lookup call by msgid and dispatch result to waiter
rxc := zl.callTab[m.msgid] zl.callMu.Lock()
if rxc != nil { rxc := zl.callTab[m.msgid]
delete(zl.callTab, m.msgid) if rxc != nil {
delete(zl.callTab, m.msgid)
}
zl.callMu.Unlock()
if rxc == nil {
return fmt.Errorf(".%d: unexpected reply", m.msgid)
}
rxc <- m
return nil
} }
zl.callMu.Unlock()
if rxc == nil {
return fmt.Errorf(".%d: unexpected reply", m.msgid) // XXX currently only async/ no other flags handled
f := zl.methTab[m.method]
if f == nil {
return fmt.Errorf(".%d: unknown method=%q", m.msgid, m.method)
} }
rxc <- m f(m.arg)
return nil return nil
} }
...@@ -216,7 +229,7 @@ func pktDecode(pkb *pktBuf) (msg, error) { ...@@ -216,7 +229,7 @@ func pktDecode(pkb *pktBuf) (msg, error) {
} }
// call makes 1 RPC call to server, waits for reply and returns it. // Call makes 1 RPC call to server, waits for reply and returns it.
func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, _ error) { func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, _ error) {
// defer func() ... // defer func() ...
reply, err := zl._call(ctx, method, argv...) reply, err := zl._call(ctx, method, argv...)
...@@ -268,6 +281,12 @@ func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{}) ...@@ -268,6 +281,12 @@ func (zl *zLink) _call(ctx context.Context, method string, argv ...interface{})
return reply, nil return reply, nil
} }
// RegisterMethod registers f to be called when remote XXX
// FIXME -> provide methodTable to dial, so that it is available right from start without any race
func (zl *zLink) RegisterMethod(method string, f func(arg interface{})) {
// XXX only "async" (without reply)
// XXX
}
// ---- raw IO ---- // ---- raw IO ----
......
...@@ -323,11 +323,18 @@ func (e *OpError) Cause() error { ...@@ -323,11 +323,18 @@ func (e *OpError) Cause() error {
// IStorage is the interface provided by opened ZODB storage. // IStorage is the interface provided by opened ZODB storage.
type IStorage interface { type IStorage interface {
IStorageDriver //IStorageDriver
// same as in IStorageDriver
URL() string
Close() error
LastTid(context.Context) (Tid, error)
Loader
Iterator
// no watcher
// additional to IStorageDriver
Prefetcher Prefetcher
//Loader
//Iterator
//XXXNotifier() -> Notifier // dedicated notifier for every open? //XXXNotifier() -> Notifier // dedicated notifier for every open?
} }
...@@ -359,7 +366,9 @@ type IStorageDriver interface { ...@@ -359,7 +366,9 @@ type IStorageDriver interface {
Loader Loader
Iterator Iterator
Watcher
/*
// Notifier returns storage driver notifier. // Notifier returns storage driver notifier.
// //
// The notifier represents invalidation channel (notify about changes // The notifier represents invalidation channel (notify about changes
...@@ -372,6 +381,7 @@ type IStorageDriver interface { ...@@ -372,6 +381,7 @@ type IStorageDriver interface {
// XXX -> nil, if driver does not support notifications? // XXX -> nil, if driver does not support notifications?
// XXX or always support them, even with FileStorage (inotify)? // XXX or always support them, even with FileStorage (inotify)?
//Notifier() Notifier //Notifier() Notifier
*/
} }
// Loader provides functionality to load objects. // Loader provides functionality to load objects.
...@@ -435,6 +445,7 @@ type Committer interface { ...@@ -435,6 +445,7 @@ type Committer interface {
} }
/*
// Notifier allows to be notified of changes made to database by other clients. // Notifier allows to be notified of changes made to database by other clients.
type Notifier interface { type Notifier interface {
...@@ -444,6 +455,16 @@ type Notifier interface { ...@@ -444,6 +455,16 @@ type Notifier interface {
// XXX overflow -> special error // XXX overflow -> special error
Read(ctx context.Context) (Tid, []Oid, error) Read(ctx context.Context) (Tid, []Oid, error)
} }
*/
// Watcher allows to be notified of changes to database.
type Watcher interface {
// Watch waits-for and returns next event corresponding to comitted transaction.
//
// XXX queue overflow -> special error?
Watch(ctx context.Context) (Tid, []Oid, error)
}
// TODO: History(ctx, oid, size=1) // TODO: History(ctx, oid, size=1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment