Commit 0dce582b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a00572f6
......@@ -20,8 +20,6 @@
package zodb
// cache management
//go:generate gotrace gen .
import (
"context"
"fmt"
......
......@@ -22,14 +22,14 @@
// FileStorage is a single file organized as a simple append-only log of
// transactions with data changes. Every transaction record consists of:
//
// - transaction record header represented by TxnHeader,
// - several data records corresponding to modified objects,
// - redundant transaction length at the end of transaction record.
// - transaction record header represented by TxnHeader,
// - several data records corresponding to modified objects,
// - redundant transaction length at the end of transaction record.
//
// Every data record consists of:
//
// - data record header represented by DataHeader,
// - actual data following the header.
// - data record header represented by DataHeader,
// - actual data following the header.
//
// The "actual data" in addition to raw content, can be a back-pointer
// indicating that the actual content should be retrieved from a past revision.
......@@ -72,8 +72,8 @@ import (
"os"
"sync"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
......@@ -84,12 +84,12 @@ import (
//
// It is on-disk compatible with FileStorage from ZODB/py.
type FileStorage struct {
file *os.File
index *Index // oid -> data record position in transaction which last changed oid
file *os.File
index *Index // oid -> data record position in transaction which last changed oid
// transaction headers for min/max transactions committed
// (both with .Len=0 & .Tid=0 if database is empty)
txnhMin TxnHeader
txnhMin TxnHeader
txnhMax TxnHeader
}
......@@ -222,8 +222,8 @@ type zIter struct {
// here to avoid allocations )
dhLoading DataHeader
datai zodb.DataInfo // ptr to this will be returned by .NextData
dataBuf *mem.Buf
datai zodb.DataInfo // ptr to this will be returned by .NextData
dataBuf *mem.Buf
}
type zIterFlags int
......@@ -332,7 +332,7 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er
return TxnHeader{}, io.EOF // no such record
}
if tmin.Tid >= tid {
return tmin, nil // tmin satisfies
return tmin, nil // tmin satisfies
}
// now we know tid ∈ (tmin, tmax]
......@@ -395,7 +395,7 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
txnh, err := fs.findTxnRecord(fsSeq, tidMin)
switch {
case err == io.EOF:
ziter.zFlags |= zIterEOF // empty
ziter.zFlags |= zIterEOF // empty
return ziter
case err != nil:
......@@ -414,7 +414,7 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
iter.Datah.Pos = txnh.DataPos() // XXX dup wrt Iter.NextTxn
iter.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record
iter.Dir = IterForward // TODO allow both ways iteration at ZODB level
iter.Dir = IterForward // TODO allow both ways iteration at ZODB level
ziter.zFlags |= zIterPreloaded
ziter.tidStop = tidMax
......@@ -435,7 +435,7 @@ func open(path string) (_ *FileStorage, err error) {
fs.file = f
defer func() {
if err != nil {
f.Close() // XXX -> lclose
f.Close() // XXX -> lclose
}
}()
......@@ -500,7 +500,7 @@ func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
}
defer func() {
if err != nil {
fs.file.Close() // XXX lclose
fs.file.Close() // XXX lclose
}
}()
......
......@@ -33,16 +33,16 @@ import (
// one database transaction record
type dbEntry struct {
Header TxnHeader
Entryv []txnEntry
Header TxnHeader
Entryv []txnEntry
}
// one entry inside transaction
type txnEntry struct {
Header DataHeader
rawData []byte // what is on disk, e.g. it can be backpointer
userData []byte // data client should see on load; `sameAsRaw` means same as RawData
DataTidHint zodb.Tid // data tid client should see on iter
Header DataHeader
rawData []byte // what is on disk, e.g. it can be backpointer
userData []byte // data client should see on load; `sameAsRaw` means same as RawData
DataTidHint zodb.Tid // data tid client should see on iter
}
var sameAsRaw = []byte{0}
......@@ -58,8 +58,8 @@ func (txe *txnEntry) Data() []byte {
// state of an object in the database for some particular revision
type objState struct {
tid zodb.Tid
data []byte // nil if obj was deleted
tid zodb.Tid
data []byte // nil if obj was deleted
}
......@@ -74,7 +74,7 @@ func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect objState) {
URL: fs.URL(),
Op: "load",
Args: xid,
Err: &zodb.NoDataError{Oid: xid.Oid , DeletedAt: expect.tid},
Err: &zodb.NoDataError{Oid: xid.Oid, DeletedAt: expect.tid},
}
if !reflect.DeepEqual(err, errOk) {
t.Errorf("load %v: returned err unexpected: %v ; want: %v", xid, err, errOk)
......@@ -117,7 +117,7 @@ func xfsopen(t testing.TB, path string) *FileStorage {
}
func TestLoad(t *testing.T) {
fs := xfsopen(t, "testdata/1.fs") // TODO open read-only
fs := xfsopen(t, "testdata/1.fs") // TODO open read-only
defer exc.XRun(fs.Close)
// current knowledge of what was "before" for an oid as we scan over
......@@ -208,7 +208,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
}
for kdata := 0; ; kdata++ {
dataErrorf := func(format string, a...interface{}) {
dataErrorf := func(format string, a ...interface{}) {
t.Helper()
dsubj := fmt.Sprintf("dstep %v#%v", kdata, len(dbe.Entryv))
msg := fmt.Sprintf(format, a...)
......@@ -266,7 +266,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
}
func TestIterate(t *testing.T) {
fs := xfsopen(t, "testdata/1.fs") // TODO open ro
fs := xfsopen(t, "testdata/1.fs") // TODO open ro
defer exc.XRun(fs.Close)
// all []tids in test database
......@@ -302,7 +302,7 @@ func TestIterate(t *testing.T) {
}
func BenchmarkIterate(b *testing.B) {
fs := xfsopen(b, "testdata/1.fs") // TODO open ro
fs := xfsopen(b, "testdata/1.fs") // TODO open ro
defer exc.XRun(fs.Close)
ctx := context.Background()
......
......@@ -26,9 +26,9 @@ import (
"io"
"os"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xbufio"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xbytes"
......@@ -41,9 +41,9 @@ type FileHeader struct {
// TxnHeader represents header of a transaction record
type TxnHeader struct {
Pos int64 // position of transaction start
LenPrev int64 // whole previous transaction record length (see EOF/error rules in Load)
Len int64 // whole transaction record length (see EOF/error rules in Load)
Pos int64 // position of transaction start
LenPrev int64 // whole previous transaction record length (see EOF/error rules in Load)
Len int64 // whole transaction record length (see EOF/error rules in Load)
// transaction metadata itself
zodb.TxnInfo
......@@ -51,51 +51,51 @@ type TxnHeader struct {
// underlying memory for header loading and for user/desc/extension strings
// invariant: after successful TxnHeader load len(.workMem) = lenUser + lenDesc + lenExt
// as specified by on-disk header.
workMem []byte
workMem []byte
}
// DataHeader represents header of a data record
type DataHeader struct {
Pos int64 // position of data record start
Oid zodb.Oid
Tid zodb.Tid
PrevRevPos int64 // position of this oid's previous-revision data record
TxnPos int64 // position of transaction record this data record belongs to
//_ uint16 // 2-bytes with zero values. (Was version length.)
DataLen int64 // length of following data. if 0 -> following = 8 bytes backpointer
// if backpointer == 0 -> oid deleted
Pos int64 // position of data record start
Oid zodb.Oid
Tid zodb.Tid
PrevRevPos int64 // position of this oid's previous-revision data record
TxnPos int64 // position of transaction record this data record belongs to
//_ uint16 // 2-bytes with zero values. (Was version length.)
DataLen int64 // length of following data. if 0 -> following = 8 bytes backpointer
// if backpointer == 0 -> oid deleted
// underlying memory for header loading (to avoid allocations)
workMem [DataHeaderSize]byte
}
const (
Magic = "FS21" // every FileStorage file starts with this
Magic = "FS21" // every FileStorage file starts with this
// on-disk sizes
FileHeaderSize = 4
TxnHeaderFixSize = 8+8+1+2+2+2 // without user/desc/ext strings
txnXHeaderFixSize = 8 + TxnHeaderFixSize // ^^^ with trail LenPrev from previous record
DataHeaderSize = 8+8+8+8+2+8
FileHeaderSize = 4
TxnHeaderFixSize = 8 + 8 + 1 + 2 + 2 + 2 // without user/desc/ext strings
txnXHeaderFixSize = 8 + TxnHeaderFixSize // ^^^ with trail LenPrev from previous record
DataHeaderSize = 8 + 8 + 8 + 8 + 2 + 8
// txn/data pos that if < vvv are for sure invalid
txnValidFrom = FileHeaderSize
dataValidFrom = txnValidFrom + TxnHeaderFixSize
txnValidFrom = FileHeaderSize
dataValidFrom = txnValidFrom + TxnHeaderFixSize
// invalid length that indicates start of iteration for TxnHeader LoadNext/LoadPrev
// NOTE load routines check loaded fields to be valid and .Len in particular,
// so it can never come to us from outside to be this.
lenIterStart int64 = -0x1111111111111112 // = 0xeeeeeeeeeeeeeeee if unsigned
lenIterStart int64 = -0x1111111111111112 // = 0xeeeeeeeeeeeeeeee if unsigned
)
// RecordError represents error associated with operation on a record in
// FileStorage data file.
type RecordError struct {
Path string // path of the data file
Record string // record kind - "file header", "transaction record", "data record", ...
Pos int64 // position of record
Subj string // subject context for the error - e.g. "read" or "check"
Err error // actual error
Path string // path of the data file
Record string // record kind - "file header", "transaction record", "data record", ...
Pos int64 // position of record
Subj string // subject context for the error - e.g. "read" or "check"
Err error // actual error
}
func (e *RecordError) Cause() error {
......@@ -148,7 +148,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error {
_, err := r.ReadAt(fh.Magic[:], 0)
err = okEOF(err)
if err != nil {
return err
return err
}
if string(fh.Magic[:]) != Magic {
return fmt.Errorf("%s: invalid fs1 magic %q", xio.Name(r), fh.Magic)
......@@ -198,8 +198,8 @@ func (txnh *TxnHeader) CloneFrom(txnh2 *TxnHeader) {
// flags for TxnHeader.Load
type TxnLoadFlags int
const (
LoadAll TxnLoadFlags = 0x00 // load whole transaction header
LoadNoStrings = 0x01 // do not load user/desc/ext strings
LoadAll TxnLoadFlags = 0x00 // load whole transaction header
LoadNoStrings = 0x01 // do not load user/desc/ext strings
)
// Load reads and decodes transaction record header @ pos.
......@@ -228,8 +228,8 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
work := txnh.workMem[:txnXHeaderFixSize]
txnh.Pos = pos
txnh.Len = -1 // read error
txnh.LenPrev = -1 // read error
txnh.Len = -1 // read error
txnh.LenPrev = -1 // read error
if pos < txnValidFrom {
bug(r, txnh, "Load() on invalid position")
......@@ -241,7 +241,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
if pos - 8 >= txnValidFrom {
// read together with previous's txn record redundant length
n, err = r.ReadAt(work, pos - 8)
n -= 8 // relative to pos
n -= 8 // relative to pos
if n >= 0 {
lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:]))
if lenPrev < TxnHeaderFixSize {
......@@ -259,14 +259,14 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
} else {
// read only current txn without previous record length
n, err = r.ReadAt(work[8:], pos)
txnh.LenPrev = 0 // EOF backward
txnh.LenPrev = 0 // EOF backward
}
if err != nil {
if err == io.EOF && n == 0 {
txnh.Len = 0 // EOF forward
return err // end of stream
txnh.Len = 0 // EOF forward
return err // end of stream
}
// EOF after txn header is not good - because at least
......@@ -291,8 +291,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
return checkErr(r, txnh, "invalid status: %q", txnh.Status)
}
luser := binary.BigEndian.Uint16(work[8+17:])
luser := binary.BigEndian.Uint16(work[8+17:])
ldesc := binary.BigEndian.Uint16(work[8+19:])
lext := binary.BigEndian.Uint16(work[8+21:])
......@@ -461,7 +460,7 @@ func (dh *DataHeader) Len() int64 {
//
// No prerequisite requirements are made to previous dh state.
func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
dh.Pos = -1 // ~ error
dh.Pos = -1 // ~ error
if pos < dataValidFrom {
bug(r, dh, "Load() on invalid position")
......@@ -473,8 +472,8 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
}
// XXX also check oid.Valid() ?
dh.Oid = zodb.Oid(binary.BigEndian.Uint64(dh.workMem[0:])) // XXX -> zodb.Oid.Decode() ?
dh.Tid = zodb.Tid(binary.BigEndian.Uint64(dh.workMem[8:])) // XXX -> zodb.Tid.Decode() ?
dh.Oid = zodb.Oid(binary.BigEndian.Uint64(dh.workMem[0:]))
dh.Tid = zodb.Tid(binary.BigEndian.Uint64(dh.workMem[8:]))
if !dh.Tid.Valid() {
return checkErr(r, dh, "invalid tid: %v", dh.Tid)
}
......@@ -488,7 +487,7 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
if dh.TxnPos + TxnHeaderFixSize > pos {
return checkErr(r, dh, "txn position not decreasing: %v", dh.TxnPos)
}
if dh.PrevRevPos != 0 { // zero means there is no previous revision
if dh.PrevRevPos != 0 { // zero means there is no previous revision
if dh.PrevRevPos < dataValidFrom {
return checkErr(r, dh, "invalid prev revision position: %v", dh.PrevRevPos)
}
......@@ -519,7 +518,7 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
// When there is no previous revision: io.EOF is returned.
func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error {
if dh.PrevRevPos == 0 {
return io.EOF // no more previous revisions
return io.EOF // no more previous revisions
}
posCur := dh.Pos
......@@ -590,7 +589,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
}
if backPos == 0 {
return io.EOF // oid was deleted
return io.EOF // oid was deleted
}
posCur := dh.Pos
......@@ -686,16 +685,16 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*mem.Buf, error) {
// Iter is combined 2-level iterator over transaction and data records
type Iter struct {
R io.ReaderAt
Dir IterDir
R io.ReaderAt
Dir IterDir
Txnh TxnHeader // current transaction record information
Datah DataHeader // current data record information
Txnh TxnHeader // current transaction record information
Datah DataHeader // current data record information
}
type IterDir int
const (
IterForward IterDir = iota
IterForward IterDir = iota
IterBackward
)
......
......@@ -102,7 +102,7 @@ func Dump(w io.Writer, path string, dir fs1.IterDir, d Dumper) (err error) {
err = it.NextTxn(fs1.LoadAll)
if err != nil {
if err == io.EOF {
err = nil // XXX -> okEOF(err)
err = nil
}
return err
}
......@@ -110,7 +110,7 @@ func Dump(w io.Writer, path string, dir fs1.IterDir, d Dumper) (err error) {
err = d.DumpTxn(buf, it)
if err != nil {
if err == io.EOF {
err = nil // XXX -> okEOF(err)
err = nil
}
return err
}
......@@ -163,7 +163,7 @@ func (d *DumperFsDump) DumpTxn(buf *xfmt.Buffer, it *fs1.Iter) error {
err := it.NextData()
if err != nil {
if err == io.EOF {
err = nil // XXX -> okEOF(err)
err = nil
}
return err
}
......@@ -359,7 +359,7 @@ func (d *DumperFsTail) DumpTxn(buf *xfmt.Buffer, it *fs1.Iter) error {
_, err := it.R.ReadAt(d.data, txnh.DataPos())
if err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF // XXX -> noEOF(err)
err = io.ErrUnexpectedEOF
}
// XXX dup wrt fs1.TxnHeader.err
return &fs1.RecordError{xio.Name(it.R), "transaction record", txnh.Pos, "read data payload", err}
......
// Code generated by gen-fsbtree from github.com/cznic/b 93348d0; DO NOT EDIT.
// (from patched version available at https://lab.nexedi.com/kirr/b.git)
//
// KEY=zodb.Oid VALUE=int64
// ---- 8< ----
......
// Code generated by gen-fsbtree from github.com/cznic/b 93348d0; DO NOT EDIT.
// (from patched version available at https://lab.nexedi.com/kirr/b.git)
//
// ---- 8< ----
package fsb
......
......@@ -28,12 +28,24 @@ de=16 # KEY+VALUE
o=24 # d.c, d.n, d.p
kd=$(( (4096 - $o - $de) / (2 * $de) ))
# git_upstream_url <repo> - show current branch upstream URL
git_upstream_url() {
repo=$1
head="`git -C $repo symbolic-ref --short HEAD`" # current branch - e.g. "t"
remote="`git -C $repo config --get branch.$head.remote`" # upstream name, e.g. "kirr"
url="`git -C $repo config --get remote.$remote.url`" # upstream URL
echo "$url"
}
b=github.com/cznic/b
Bdir=`go list -f '{{.Dir}}' $b`
Brev=`cd $Bdir && git describe --always`
Brev=`git -C $Bdir describe --always`
Bweb=`git_upstream_url $Bdir`
out=fsbtree.go
echo "// Code generated by gen-fsbtree from $b $Brev; DO NOT EDIT." >$out
echo "// (from patched version available at $Bweb)" >>$out
echo "//" >>$out
echo "// KEY=$KEY VALUE=$VALUE" >>$out
echo "// ---- 8< ----" >>$out
echo >>$out
......@@ -56,6 +68,8 @@ make -s -C $Bdir generic |sed \
# also extract dump() routine
out=fsbtree_util.go
echo "// Code generated by gen-fsbtree from $b $Brev; DO NOT EDIT." >$out
echo "// (from patched version available at $Bweb)" >>$out
echo "//" >>$out
echo "// ---- 8< ----" >>$out
echo >>$out
cat >>$out <<EOF
......
......@@ -23,7 +23,6 @@ import (
"io"
)
// XXX -> xio ?
// noEOF returns err, but changes io.EOF -> io.ErrUnexpectedEOF
func noEOF(err error) error {
if err == io.EOF {
......
......@@ -239,8 +239,8 @@ type IStorageDriver interface {
// TODO: write mode
// Store(oid Oid, serial Tid, data []byte, txn ITransaction) error
// StoreKeepCurrent(oid Oid, serial Tid, txn ITransaction)
// Store(ctx, oid Oid, serial Tid, data []byte, txn ITransaction) error
// StoreKeepCurrent(ctx, oid Oid, serial Tid, txn ITransaction)
// TpcBegin(txn)
// TpcVote(txn)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment