Commit 55aaafdd authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5c3763ac
...@@ -58,18 +58,23 @@ type DataHeader struct { ...@@ -58,18 +58,23 @@ type DataHeader struct {
// if backpointer == 0 -> oid deleted // if backpointer == 0 -> oid deleted
//Data []byte //Data []byte
//DataRecPos uint64 // if Data == nil -> byte position of data record containing data //DataRecPos uint64 // if Data == nil -> byte position of data record containing data
// XXX include word0 ?
} }
const DataHeaderSize = 42 const DataHeaderSize = 42
// ErrDataRead is returned on data record read / decode errors // ErrDataRead is returned on data record read / decode errors
type ErrDataRead struct { type ErrDataRecord struct {
Pos int64 Pos int64
Err error Err error
} }
func (e *ErrDataRead) Error() string { func (e *ErrDataRead) Error() string {
return fmt.Sprintf("data read: record @%v: %v", e.Pos, e.Err) //return fmt.Sprintf("data read: record @%v: %v", e.Pos, e.Err)
// TODO read -> header | payload
return fmr.Sprintf("data record @%v: read: %v", e.Pos, e.Err)
// XXX -> data record @%v: %v ?
} }
// XXX -> zodb? // XXX -> zodb?
...@@ -81,7 +86,7 @@ var ErrVersionNonZero = errors.New("non-zero version") ...@@ -81,7 +86,7 @@ var ErrVersionNonZero = errors.New("non-zero version")
func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[DataHeaderSize]byte) error { func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[DataHeaderSize]byte) error {
n, err := r.ReadAt(tmpBuf[:], pos) n, err := r.ReadAt(tmpBuf[:], pos)
if n == DataHeaderSize { if n == DataHeaderSize {
err = nil // we don't care if it was EOF after full header read XXX ok? err = nil // we don't mind if it was EOF after full header read XXX ok?
} }
if err != nil { if err != nil {
...@@ -96,12 +101,14 @@ func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[DataHeaderSize]b ...@@ -96,12 +101,14 @@ func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[DataHeaderSize]b
dh.DataLen = binary.BigEndian.Uint64(tmpBuf[34:]) dh.DataLen = binary.BigEndian.Uint64(tmpBuf[34:])
if verlen != 0 { if verlen != 0 {
// XXX -> ...: header invalid: non-zero version
return &ErrDataRead{pos, ErrVersionNonZero} return &ErrDataRead{pos, ErrVersionNonZero}
} }
return nil return nil
} }
// XXX do we need Decode when decode() is there?
func (dh *DataHeader) Decode(r io.ReaderAt, pos int64) error { func (dh *DataHeader) Decode(r io.ReaderAt, pos int64) error {
var tmpBuf [DataHeaderSize]byte var tmpBuf [DataHeaderSize]byte
return dh.decode(r, pos, &tmpBuf) return dh.decode(r, pos, &tmpBuf)
...@@ -121,14 +128,73 @@ func NewFileStorage(path string) (*FileStorage, error) { ...@@ -121,14 +128,73 @@ func NewFileStorage(path string) (*FileStorage, error) {
// TODO read/recreate index // TODO read/recreate index
} }
// ErrOidLoad is returned when there is an error while loading oid
type ErrOidLoad struct {
Oid zodb.Oid
Err error
}
func (e *ErrOidLoad) Error() string {
// TODO include whole (=|<)tid:oid ?
return fmt.Sprintf("loading oid %v: %v", e.Oid, e.Err)
}
func (fs *FileStorage) LoadBefore(oid zodb.Oid, beforeTid zodb.Tid) (data []byte, tid zodb.Tid, err error) { func (fs *FileStorage) LoadBefore(oid zodb.Oid, beforeTid zodb.Tid) (data []byte, tid zodb.Tid, err error) {
// lookup in index position of oid data record with latest transaction who changed this oid // lookup in index position of oid data record within latest transaction who changed this oid
dataPos, ok := fs.index.Get(oid) dataPos, ok := fs.index.Get(oid)
if !ok { if !ok {
return nil, zodb.Tid(0), zodb.ErrOidMissing{Oid: oid} return nil, zodb.Tid(0), zodb.ErrOidMissing{Oid: oid}
// XXX -> &ErrOidLoad{oid, zodb.ErrOidMissing} ?
} }
dh := DataHeader{Tid: zodb.TidMax}
// search backwards for when we first have data record with tid < beforeTid // search backwards for when we first have data record with tid < beforeTid
for {
prevTid := dh.Tid
err = dh.Decode(fs.f, dataPos)
if err != nil {
return nil, zodb.Tid(0), &ErrOidLoad{oid, err}
}
// check data record consistency
if dh.Oid != oid {
// ... header invalid:
return nil, zodb.Tid(0), &ErrOidLoad{oid, &ErrDataRead{dataPos, "TODO unexpected oid")}
}
if dh.Tid >= prevTid { ... }
if dh.TxnPos >= dataPos - TxnHeaderSize { ... }
if dh.PrevDataRecPos >= dh.TxnPos - DataHeaderSize - 8 /* XXX */ { ... }
if dh.Tid < beforeTid {
break
}
// continue search
dataPos = dh.PrevDataRecPos
dataPos == 0 {
// no such oid revision
return ...&ErrOidLoad{oid, zodb.ErrOidRevMissing}
}
}
// found dh.Tid < beforeTid
// now read actual data / scan via backpointers
if dh.DataLen == 0 {
// backpointer - TODO
}
data = make([]byte, dh.DataLen) // TODO -> slab ?
n, err := fs.f.ReadAt(data, dataPos + DataHeaderSize)
if n == len(data) {
err = nil // we don't mind to get EOF after full data read XXX ok?
}
if err != nil {
return nil, zodb.Tid(0), &ErrOidLoad{oid, err}
}
return data, dh.Tid, nil
} }
func (fs *FileStorage) Close() error { func (fs *FileStorage) Close() error {
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
# TODO author/copyright # TODO author/copyright
"""generate reference database and index for tests""" """generate reference database and index for tests"""
# TODO generate transactions with backpointers
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB import DB from ZODB import DB
from persistent import Persistent from persistent import Persistent
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment